Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8#[cfg(test)]
9use pulpo_common::session::Runtime;
10use pulpo_common::session::Session;
11
12#[derive(Parser, Debug)]
13#[command(
14    name = "pulpo",
15    about = "Manage agent sessions across your machines",
16    version = env!("PULPO_VERSION"),
17    args_conflicts_with_subcommands = true
18)]
19pub struct Cli {
20    /// Target node (default: localhost)
21    #[arg(long, default_value = "localhost:7433")]
22    pub node: String,
23
24    /// Auth token (auto-discovered from local daemon if omitted)
25    #[arg(long)]
26    pub token: Option<String>,
27
28    #[command(subcommand)]
29    pub command: Option<Commands>,
30
31    /// Quick spawn: `pulpo <path>` spawns a session in that directory
32    #[arg(value_name = "PATH")]
33    pub path: Option<String>,
34}
35
36#[derive(Subcommand, Debug)]
37#[allow(clippy::large_enum_variant)]
38pub enum Commands {
39    /// Attach to a session's terminal
40    #[command(visible_alias = "a")]
41    Attach {
42        /// Session name or ID
43        name: String,
44    },
45
46    /// Send input to a session
47    #[command(visible_alias = "i", visible_alias = "send")]
48    Input {
49        /// Session name or ID
50        name: String,
51        /// Text to send (sends Enter if omitted)
52        text: Option<String>,
53    },
54
55    /// Spawn a new agent session
56    #[command(visible_alias = "s")]
57    Spawn {
58        /// Session name (auto-generated from workdir if omitted)
59        name: Option<String>,
60
61        /// Working directory (defaults to current directory)
62        #[arg(long)]
63        workdir: Option<String>,
64
65        /// Ink name (from config)
66        #[arg(long)]
67        ink: Option<String>,
68
69        /// Human-readable description of the task
70        #[arg(long)]
71        description: Option<String>,
72
73        /// Don't attach to the session after spawning
74        #[arg(short, long)]
75        detach: bool,
76
77        /// Idle threshold in seconds (0 = never idle)
78        #[arg(long)]
79        idle_threshold: Option<u32>,
80
81        /// Auto-select the least loaded node
82        #[arg(long)]
83        auto: bool,
84
85        /// Create an isolated git worktree for the session
86        #[arg(long)]
87        worktree: bool,
88
89        /// Base branch to fork the worktree from (implies --worktree)
90        #[arg(long = "worktree-base")]
91        worktree_base: Option<String>,
92
93        /// Runtime environment: tmux (default) or docker
94        #[arg(long)]
95        runtime: Option<String>,
96
97        /// Secrets to inject as environment variables (by name)
98        #[arg(long)]
99        secret: Vec<String>,
100
101        /// Command to run (everything after --)
102        #[arg(last = true)]
103        command: Vec<String>,
104    },
105
106    /// List sessions (live only by default)
107    #[command(visible_alias = "ls")]
108    List {
109        /// Show all sessions including stopped and lost
110        #[arg(short, long)]
111        all: bool,
112    },
113
114    /// Show session logs/output
115    #[command(visible_alias = "l")]
116    Logs {
117        /// Session name or ID
118        name: String,
119
120        /// Number of lines to fetch
121        #[arg(long, default_value = "100")]
122        lines: usize,
123
124        /// Follow output (like `tail -f`)
125        #[arg(short, long)]
126        follow: bool,
127    },
128
129    /// Stop one or more sessions
130    #[command(visible_alias = "k", alias = "kill")]
131    Stop {
132        /// Session names or IDs
133        #[arg(required = true)]
134        names: Vec<String>,
135
136        /// Also purge the session from history
137        #[arg(long, short = 'p')]
138        purge: bool,
139    },
140
141    /// Remove all stopped and lost sessions
142    Cleanup,
143
144    /// Resume a lost session
145    #[command(visible_alias = "r")]
146    Resume {
147        /// Session name or ID
148        name: String,
149    },
150
151    /// List all known nodes
152    #[command(visible_alias = "n")]
153    Nodes,
154
155    /// Show intervention history for a session
156    #[command(visible_alias = "iv")]
157    Interventions {
158        /// Session name or ID
159        name: String,
160    },
161
162    /// Open the web dashboard in your browser
163    Ui,
164
165    /// Manage scheduled agent runs
166    #[command(visible_alias = "sched")]
167    Schedule {
168        #[command(subcommand)]
169        action: ScheduleAction,
170    },
171
172    /// Manage secrets (environment variables injected into sessions)
173    #[command(visible_alias = "sec")]
174    Secret {
175        #[command(subcommand)]
176        action: SecretAction,
177    },
178
179    /// Manage git worktrees for sessions
180    #[command(visible_alias = "wt")]
181    Worktree {
182        #[command(subcommand)]
183        action: WorktreeAction,
184    },
185}
186
187#[derive(Subcommand, Debug)]
188pub enum SecretAction {
189    /// Set a secret
190    Set {
191        /// Secret name (will be the env var name, uppercase + underscores)
192        name: String,
193        /// Secret value
194        value: String,
195        /// Environment variable name (defaults to secret name)
196        #[arg(long)]
197        env: Option<String>,
198    },
199    /// List secret names
200    #[command(visible_alias = "ls")]
201    List,
202    /// Delete a secret
203    #[command(visible_alias = "rm")]
204    Delete {
205        /// Secret name
206        name: String,
207    },
208}
209
210#[derive(Subcommand, Debug)]
211pub enum WorktreeAction {
212    /// List sessions that use git worktrees
213    #[command(visible_alias = "ls")]
214    List,
215}
216
217#[derive(Subcommand, Debug)]
218pub enum ScheduleAction {
219    /// Add a new schedule
220    #[command(alias = "install")]
221    Add {
222        /// Schedule name
223        name: String,
224        /// Cron expression (e.g. "0 3 * * *")
225        cron: String,
226        /// Working directory
227        #[arg(long)]
228        workdir: Option<String>,
229        /// Target node (omit = local, "auto" = least-loaded)
230        #[arg(long)]
231        node: Option<String>,
232        /// Ink preset
233        #[arg(long)]
234        ink: Option<String>,
235        /// Description
236        #[arg(long)]
237        description: Option<String>,
238        /// Command to run (everything after --)
239        #[arg(last = true)]
240        command: Vec<String>,
241    },
242    /// List all schedules
243    #[command(alias = "ls")]
244    List,
245    /// Remove a schedule
246    #[command(alias = "rm")]
247    Remove {
248        /// Schedule name or ID
249        name: String,
250    },
251    /// Pause a schedule
252    Pause {
253        /// Schedule name or ID
254        name: String,
255    },
256    /// Resume a paused schedule
257    Resume {
258        /// Schedule name or ID
259        name: String,
260    },
261}
262
263/// The marker emitted by the agent wrapper when the agent process exits.
264const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
265
266/// Resolve a path to an absolute path string.
267fn resolve_path(path: &str) -> String {
268    let p = std::path::Path::new(path);
269    if p.is_absolute() {
270        path.to_owned()
271    } else {
272        std::env::current_dir().map_or_else(
273            |_| path.to_owned(),
274            |cwd| cwd.join(p).to_string_lossy().into_owned(),
275        )
276    }
277}
278
279/// Derive a session name from a directory path (basename, kebab-cased).
280fn derive_session_name(path: &str) -> String {
281    let basename = std::path::Path::new(path)
282        .file_name()
283        .and_then(|n| n.to_str())
284        .unwrap_or("session");
285    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
286    let kebab: String = basename
287        .chars()
288        .map(|c| {
289            if c.is_ascii_alphanumeric() {
290                c.to_ascii_lowercase()
291            } else {
292                '-'
293            }
294        })
295        .collect();
296    // Collapse consecutive hyphens and trim leading/trailing hyphens
297    let mut result = String::new();
298    for c in kebab.chars() {
299        if c == '-' && result.ends_with('-') {
300            continue;
301        }
302        result.push(c);
303    }
304    let result = result.trim_matches('-').to_owned();
305    if result.is_empty() {
306        "session".to_owned()
307    } else {
308        result
309    }
310}
311
312/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
313async fn deduplicate_session_name(
314    client: &reqwest::Client,
315    base: &str,
316    name: &str,
317    token: Option<&str>,
318) -> String {
319    // Check if the name is already taken by fetching the session
320    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
321        .send()
322        .await;
323    match resp {
324        Ok(r) if r.status().is_success() => {
325            // Session exists — try suffixed names
326            for i in 2..=99 {
327                let candidate = format!("{name}-{i}");
328                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
329                    .send()
330                    .await;
331                match resp {
332                    Ok(r) if r.status().is_success() => {}
333                    _ => return candidate,
334                }
335            }
336            format!("{name}-100")
337        }
338        _ => name.to_owned(),
339    }
340}
341
342/// Format the base URL from the node address.
343pub fn base_url(node: &str) -> String {
344    format!("http://{node}")
345}
346
347/// Response shape for the output endpoint.
348#[derive(serde::Deserialize)]
349struct OutputResponse {
350    output: String,
351}
352
353/// Format the repo column: `basename@branch +42/-7 ↑3` with diff stats and ahead count.
354/// Truncates to 30 chars if needed.
355/// Format the branch column: branch name + diff stats + ahead count.
356fn format_branch(session: &Session) -> String {
357    let branch = session.git_branch.as_deref().unwrap_or("-").to_owned();
358
359    let mut suffix = String::new();
360    let ins = session.git_insertions.unwrap_or(0);
361    let del = session.git_deletions.unwrap_or(0);
362    if ins > 0 || del > 0 {
363        suffix = format!(" +{ins}/-{del}");
364    }
365    if let Some(ahead) = session.git_ahead
366        && ahead > 0
367    {
368        suffix = format!("{suffix} \u{2191}{ahead}");
369    }
370
371    format!("{branch}{suffix}")
372}
373
374/// Build a display name with badges: [wt] [PR] [!]
375fn format_name(session: &Session) -> String {
376    let mut name = session.name.clone();
377    if session.worktree_path.is_some() {
378        name = format!("{name} [wt]");
379    }
380    if session
381        .metadata
382        .as_ref()
383        .is_some_and(|m| m.contains_key("pr_url"))
384    {
385        name = format!("{name} [PR]");
386    }
387    if session
388        .metadata
389        .as_ref()
390        .is_some_and(|m| m.contains_key("error_status"))
391    {
392        name = format!("{name} [!]");
393    }
394    name
395}
396
397/// Truncate a string to `max` chars with ellipsis.
398fn truncate(s: &str, max: usize) -> String {
399    if s.len() <= max {
400        s.to_owned()
401    } else {
402        let t: String = s.chars().take(max.saturating_sub(3)).collect();
403        format!("{t}...")
404    }
405}
406
407fn format_sessions(sessions: &[Session]) -> String {
408    if sessions.is_empty() {
409        return "No sessions.".into();
410    }
411
412    // Compute dynamic column widths from data
413    let rows: Vec<(String, String, String, String, String)> = sessions
414        .iter()
415        .map(|s| {
416            (
417                s.id.to_string()[..8].to_owned(),
418                format_name(s),
419                s.status.to_string(),
420                format_branch(s),
421                s.command.clone(),
422            )
423        })
424        .collect();
425
426    let w_id = 8;
427    let w_name = rows.iter().map(|r| r.1.len()).max().unwrap_or(4).max(4);
428    let w_status = 8;
429    let w_branch = rows.iter().map(|r| r.3.len()).max().unwrap_or(6).max(6);
430
431    let mut lines = vec![format!(
432        "{:<w_id$}  {:<w_name$}  {:<w_status$}  {:<w_branch$}  {}",
433        "ID", "NAME", "STATUS", "BRANCH", "COMMAND"
434    )];
435    for (id, name, status, branch, cmd) in &rows {
436        lines.push(format!(
437            "{:<w_id$}  {:<w_name$}  {:<w_status$}  {:<w_branch$}  {}",
438            id,
439            truncate(name, w_name),
440            status,
441            truncate(branch, w_branch),
442            truncate(cmd, 50)
443        ));
444    }
445    lines.join("\n")
446}
447
448/// Format the peers response as a table.
449fn format_nodes(resp: &PeersResponse) -> String {
450    let mut lines = vec![format!(
451        "{:<20} {:<25} {:<10} {}",
452        "NAME", "ADDRESS", "STATUS", "SESSIONS"
453    )];
454    lines.push(format!(
455        "{:<20} {:<25} {:<10} {}",
456        resp.local.name, "(local)", "online", "-"
457    ));
458    for p in &resp.peers {
459        let sessions = p
460            .session_count
461            .map_or_else(|| "-".into(), |c| c.to_string());
462        lines.push(format!(
463            "{:<20} {:<25} {:<10} {}",
464            p.name, p.address, p.status, sessions
465        ));
466    }
467    lines.join("\n")
468}
469
470/// Format intervention events as a table.
471fn format_interventions(events: &[InterventionEventResponse]) -> String {
472    if events.is_empty() {
473        return "No intervention events.".into();
474    }
475    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
476    for e in events {
477        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
478    }
479    lines.join("\n")
480}
481
482/// Build the command to open a URL in the default browser.
483#[cfg_attr(coverage, allow(dead_code))]
484fn build_open_command(url: &str) -> std::process::Command {
485    #[cfg(target_os = "macos")]
486    {
487        let mut cmd = std::process::Command::new("open");
488        cmd.arg(url);
489        cmd
490    }
491    #[cfg(target_os = "linux")]
492    {
493        let mut cmd = std::process::Command::new("xdg-open");
494        cmd.arg(url);
495        cmd
496    }
497    #[cfg(target_os = "windows")]
498    {
499        let mut cmd = std::process::Command::new("cmd");
500        cmd.args(["/C", "start", url]);
501        cmd
502    }
503    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
504    {
505        // Fallback: try xdg-open
506        let mut cmd = std::process::Command::new("xdg-open");
507        cmd.arg(url);
508        cmd
509    }
510}
511
512/// Open a URL in the default browser.
513#[cfg(not(coverage))]
514fn open_browser(url: &str) -> Result<()> {
515    build_open_command(url).status()?;
516    Ok(())
517}
518
519/// Stub for coverage builds — avoids opening a browser during tests.
520#[cfg(coverage)]
521fn open_browser(_url: &str) -> Result<()> {
522    Ok(())
523}
524
525/// Build the command to attach to a session's terminal.
526/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
527#[cfg_attr(coverage, allow(dead_code))]
528fn build_attach_command(backend_session_id: &str) -> std::process::Command {
529    // Docker sessions: exec into the container
530    if let Some(container) = backend_session_id.strip_prefix("docker:") {
531        let mut cmd = std::process::Command::new("docker");
532        cmd.args(["exec", "-it", container, "/bin/sh"]);
533        return cmd;
534    }
535    // tmux sessions
536    #[cfg(not(target_os = "windows"))]
537    {
538        let mut cmd = std::process::Command::new("tmux");
539        cmd.args(["attach-session", "-t", backend_session_id]);
540        cmd
541    }
542    #[cfg(target_os = "windows")]
543    {
544        // tmux attach not available on Windows — inform the user
545        let mut cmd = std::process::Command::new("cmd");
546        cmd.args([
547            "/C",
548            "echo",
549            "Attach not available on Windows. Use the web UI or --runtime docker.",
550        ]);
551        cmd
552    }
553}
554
555/// Attach to a session's terminal.
556#[cfg(not(any(test, coverage, target_os = "windows")))]
557fn attach_session(backend_session_id: &str) -> Result<()> {
558    let status = build_attach_command(backend_session_id).status()?;
559    if !status.success() {
560        anyhow::bail!("attach failed with {status}");
561    }
562    Ok(())
563}
564
565/// Stub for Windows — tmux attach is not available.
566#[cfg(all(target_os = "windows", not(test), not(coverage)))]
567fn attach_session(_backend_session_id: &str) -> Result<()> {
568    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
569    Ok(())
570}
571
572/// Stub for test and coverage builds — avoids spawning real terminals during tests.
573#[cfg(any(test, coverage))]
574#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
575fn attach_session(_backend_session_id: &str) -> Result<()> {
576    Ok(())
577}
578
579/// Extract a clean error message from an API JSON response (or fall back to raw text).
580fn api_error(text: &str) -> anyhow::Error {
581    serde_json::from_str::<serde_json::Value>(text)
582        .ok()
583        .and_then(|v| v["error"].as_str().map(String::from))
584        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
585}
586
587/// Return the response body text, or a clean error if the response was non-success.
588async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
589    if resp.status().is_success() {
590        Ok(resp.text().await?)
591    } else {
592        let text = resp.text().await?;
593        Err(api_error(&text))
594    }
595}
596
597/// Map a reqwest error to a user-friendly message.
598fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
599    if err.is_connect() {
600        anyhow::anyhow!(
601            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
602        )
603    } else {
604        anyhow::anyhow!("Network error connecting to {node}: {err}")
605    }
606}
607
608/// Check if the node address points to localhost.
609fn is_localhost(node: &str) -> bool {
610    let host = node.split(':').next().unwrap_or(node);
611    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
612}
613
614/// Try to auto-discover the auth token from a local daemon.
615async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
616    let resp = client
617        .get(format!("{base}/api/v1/auth/token"))
618        .send()
619        .await
620        .ok()?;
621    let body: AuthTokenResponse = resp.json().await.ok()?;
622    if body.token.is_empty() {
623        None
624    } else {
625        Some(body.token)
626    }
627}
628
629/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
630async fn resolve_token(
631    client: &reqwest::Client,
632    base: &str,
633    node: &str,
634    explicit: Option<&str>,
635) -> Option<String> {
636    if let Some(t) = explicit {
637        return Some(t.to_owned());
638    }
639    if is_localhost(node) {
640        return discover_token(client, base).await;
641    }
642    None
643}
644
645/// Check if a node string needs resolution (no port specified).
646fn node_needs_resolution(node: &str) -> bool {
647    !node.contains(':')
648}
649
650/// Resolve a node reference to a `host:port` address.
651///
652/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
653/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
654/// online peer is found, return its address and optionally its configured auth token
655/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
656#[cfg(not(coverage))]
657async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
658    // Already has port — use as-is
659    if !node_needs_resolution(node) {
660        return (node.to_owned(), None);
661    }
662
663    // Try to resolve via local daemon's peer registry
664    let local_base = "http://localhost:7433";
665    let mut resolved_address: Option<String> = None;
666
667    if let Ok(resp) = client
668        .get(format!("{local_base}/api/v1/peers"))
669        .send()
670        .await
671        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
672    {
673        for peer in &peers_resp.peers {
674            if peer.name == node {
675                resolved_address = Some(peer.address.clone());
676                break;
677            }
678        }
679    }
680
681    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
682
683    // Try to get the peer's auth token from the config endpoint
684    let peer_token = if let Ok(resp) = client
685        .get(format!("{local_base}/api/v1/config"))
686        .send()
687        .await
688        && let Ok(config) = resp.json::<ConfigResponse>().await
689        && let Some(entry) = config.peers.get(node)
690    {
691        entry.token().map(String::from)
692    } else {
693        None
694    };
695
696    (address, peer_token)
697}
698
699/// Coverage stub — no real HTTP resolution during coverage builds.
700#[cfg(coverage)]
701async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
702    if node_needs_resolution(node) {
703        (format!("{node}:7433"), None)
704    } else {
705        (node.to_owned(), None)
706    }
707}
708
709/// Build an authenticated GET request.
710fn authed_get(
711    client: &reqwest::Client,
712    url: String,
713    token: Option<&str>,
714) -> reqwest::RequestBuilder {
715    let req = client.get(url);
716    if let Some(t) = token {
717        req.bearer_auth(t)
718    } else {
719        req
720    }
721}
722
723/// Build an authenticated POST request.
724fn authed_post(
725    client: &reqwest::Client,
726    url: String,
727    token: Option<&str>,
728) -> reqwest::RequestBuilder {
729    let req = client.post(url);
730    if let Some(t) = token {
731        req.bearer_auth(t)
732    } else {
733        req
734    }
735}
736
737/// Build an authenticated DELETE request.
738fn authed_delete(
739    client: &reqwest::Client,
740    url: String,
741    token: Option<&str>,
742) -> reqwest::RequestBuilder {
743    let req = client.delete(url);
744    if let Some(t) = token {
745        req.bearer_auth(t)
746    } else {
747        req
748    }
749}
750
751/// Build an authenticated PUT request.
752#[cfg(not(coverage))]
753fn authed_put(
754    client: &reqwest::Client,
755    url: String,
756    token: Option<&str>,
757) -> reqwest::RequestBuilder {
758    let req = client.put(url);
759    if let Some(t) = token {
760        req.bearer_auth(t)
761    } else {
762        req
763    }
764}
765
766/// Fetch session output from the API.
767async fn fetch_output(
768    client: &reqwest::Client,
769    base: &str,
770    name: &str,
771    lines: usize,
772    token: Option<&str>,
773) -> Result<String> {
774    let resp = authed_get(
775        client,
776        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
777        token,
778    )
779    .send()
780    .await?;
781    let text = ok_or_api_error(resp).await?;
782    let output: OutputResponse = serde_json::from_str(&text)?;
783    Ok(output.output)
784}
785
786/// Fetch session status from the API.
787async fn fetch_session_status(
788    client: &reqwest::Client,
789    base: &str,
790    name: &str,
791    token: Option<&str>,
792) -> Result<String> {
793    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
794        .send()
795        .await?;
796    let text = ok_or_api_error(resp).await?;
797    let session: Session = serde_json::from_str(&text)?;
798    Ok(session.status.to_string())
799}
800
801/// Wait for the session to leave "creating" state, then check if it died instantly.
802/// Uses the session ID (not name) to avoid matching old stopped sessions with the same name.
803/// Returns an error with a helpful message if the session is lost/stopped.
804async fn check_session_alive(
805    client: &reqwest::Client,
806    base: &str,
807    session_id: &str,
808    token: Option<&str>,
809) -> Result<()> {
810    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
811    for _ in 0..3 {
812        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
813        // Fetch by ID to avoid name collisions with old sessions
814        let resp = authed_get(
815            client,
816            format!("{base}/api/v1/sessions/{session_id}"),
817            token,
818        )
819        .send()
820        .await;
821        if let Ok(resp) = resp
822            && let Ok(text) = ok_or_api_error(resp).await
823            && let Ok(session) = serde_json::from_str::<Session>(&text)
824        {
825            match session.status.to_string().as_str() {
826                "creating" => continue,
827                "lost" | "stopped" => {
828                    anyhow::bail!(
829                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
830                        session.name,
831                        session.name
832                    );
833                }
834                _ => return Ok(()),
835            }
836        }
837        // fetch failed — don't block, proceed to attach
838        break;
839    }
840    Ok(())
841}
842
843/// Compute the new trailing lines that differ from the previous output.
844///
845/// The output endpoint returns the last N lines from the terminal pane. As new lines
846/// appear, old lines at the top scroll off. We find the overlap between the end
847/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
848/// trailing lines.
849fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
850    if prev.is_empty() {
851        return new;
852    }
853
854    let prev_lines: Vec<&str> = prev.lines().collect();
855    let new_lines: Vec<&str> = new.lines().collect();
856
857    if new_lines.is_empty() {
858        return "";
859    }
860
861    // prev is non-empty (early return above), so last() always succeeds
862    let last_prev = prev_lines[prev_lines.len() - 1];
863
864    // Find the last line of prev in new to determine the overlap boundary
865    for i in (0..new_lines.len()).rev() {
866        if new_lines[i] == last_prev {
867            // Verify contiguous overlap: check that lines before this match too
868            let overlap_len = prev_lines.len().min(i + 1);
869            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
870            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
871            if prev_tail == new_overlap {
872                if i + 1 < new_lines.len() {
873                    // Return the slice of `new` after the overlap
874                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
875                    return new.get(consumed.min(new.len())..).unwrap_or("");
876                }
877                return "";
878            }
879        }
880    }
881
882    // No overlap found — output changed completely, print it all
883    new
884}
885
886/// Follow logs by polling, printing only new output. Returns when the session ends.
887async fn follow_logs(
888    client: &reqwest::Client,
889    base: &str,
890    name: &str,
891    lines: usize,
892    token: Option<&str>,
893    writer: &mut (dyn std::io::Write + Send),
894) -> Result<()> {
895    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
896    write!(writer, "{prev_output}")?;
897
898    let mut unchanged_ticks: u32 = 0;
899
900    loop {
901        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
902
903        // Fetch latest output
904        let new_output = fetch_output(client, base, name, lines, token).await?;
905
906        let diff = diff_output(&prev_output, &new_output);
907        if diff.is_empty() {
908            unchanged_ticks += 1;
909        } else {
910            write!(writer, "{diff}")?;
911            unchanged_ticks = 0;
912        }
913
914        // Check for agent exit marker in output
915        if new_output.contains(AGENT_EXIT_MARKER) {
916            break;
917        }
918
919        prev_output = new_output;
920
921        // Only check session status when output has been unchanged for 3+ ticks
922        if unchanged_ticks >= 3 {
923            let status = fetch_session_status(client, base, name, token).await?;
924            let is_terminal = status == "ready" || status == "stopped" || status == "lost";
925            if is_terminal {
926                break;
927            }
928        }
929    }
930    Ok(())
931}
932
933// --- Schedule API ---
934
935/// Execute a schedule subcommand via the scheduler API.
936#[cfg(not(coverage))]
937async fn execute_schedule(
938    client: &reqwest::Client,
939    action: &ScheduleAction,
940    base: &str,
941    token: Option<&str>,
942) -> Result<String> {
943    match action {
944        ScheduleAction::Add {
945            name,
946            cron,
947            workdir,
948            node,
949            ink,
950            description,
951            command,
952        } => {
953            let cmd = if command.is_empty() {
954                None
955            } else {
956                Some(command.join(" "))
957            };
958            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
959                std::env::current_dir()
960                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
961            });
962            let mut body = serde_json::json!({
963                "name": name,
964                "cron": cron,
965                "workdir": resolved_workdir,
966            });
967            if let Some(c) = &cmd {
968                body["command"] = serde_json::json!(c);
969            }
970            if let Some(n) = node {
971                body["target_node"] = serde_json::json!(n);
972            }
973            if let Some(i) = ink {
974                body["ink"] = serde_json::json!(i);
975            }
976            if let Some(d) = description {
977                body["description"] = serde_json::json!(d);
978            }
979            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
980                .json(&body)
981                .send()
982                .await?;
983            ok_or_api_error(resp).await?;
984            Ok(format!("Created schedule \"{name}\""))
985        }
986        ScheduleAction::List => {
987            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
988                .send()
989                .await?;
990            let text = ok_or_api_error(resp).await?;
991            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
992            Ok(format_schedules(&schedules))
993        }
994        ScheduleAction::Remove { name } => {
995            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
996                .send()
997                .await?;
998            ok_or_api_error(resp).await?;
999            Ok(format!("Removed schedule \"{name}\""))
1000        }
1001        ScheduleAction::Pause { name } => {
1002            let body = serde_json::json!({ "enabled": false });
1003            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
1004                .json(&body)
1005                .send()
1006                .await?;
1007            ok_or_api_error(resp).await?;
1008            Ok(format!("Paused schedule \"{name}\""))
1009        }
1010        ScheduleAction::Resume { name } => {
1011            let body = serde_json::json!({ "enabled": true });
1012            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
1013                .json(&body)
1014                .send()
1015                .await?;
1016            ok_or_api_error(resp).await?;
1017            Ok(format!("Resumed schedule \"{name}\""))
1018        }
1019    }
1020}
1021
1022/// Coverage stub for schedule execution.
1023#[cfg(coverage)]
1024#[allow(clippy::unnecessary_wraps)]
1025async fn execute_schedule(
1026    _client: &reqwest::Client,
1027    _action: &ScheduleAction,
1028    _base: &str,
1029    _token: Option<&str>,
1030) -> Result<String> {
1031    Ok(String::new())
1032}
1033
1034// --- Secret API ---
1035
1036/// Format secret entries as a table.
1037#[cfg_attr(coverage, allow(dead_code))]
1038fn format_secrets(secrets: &[serde_json::Value]) -> String {
1039    if secrets.is_empty() {
1040        return "No secrets configured.".into();
1041    }
1042    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
1043    for s in secrets {
1044        let name = s["name"].as_str().unwrap_or("?");
1045        let env_display = s["env"]
1046            .as_str()
1047            .map_or_else(|| name.to_owned(), String::from);
1048        let created = s["created_at"]
1049            .as_str()
1050            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1051        lines.push(format!("{name:<24} {env_display:<24} {created}"));
1052    }
1053    lines.join("\n")
1054}
1055
1056/// Execute a secret subcommand via the secrets API.
1057#[cfg(not(coverage))]
1058async fn execute_secret(
1059    client: &reqwest::Client,
1060    action: &SecretAction,
1061    base: &str,
1062    token: Option<&str>,
1063) -> Result<String> {
1064    match action {
1065        SecretAction::Set { name, value, env } => {
1066            let mut body = serde_json::json!({ "value": value });
1067            if let Some(e) = env {
1068                body["env"] = serde_json::json!(e);
1069            }
1070            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1071                .json(&body)
1072                .send()
1073                .await?;
1074            ok_or_api_error(resp).await?;
1075            Ok(format!("Secret \"{name}\" set."))
1076        }
1077        SecretAction::List => {
1078            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1079                .send()
1080                .await?;
1081            let text = ok_or_api_error(resp).await?;
1082            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1083            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1084            Ok(format_secrets(secrets))
1085        }
1086        SecretAction::Delete { name } => {
1087            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1088                .send()
1089                .await?;
1090            ok_or_api_error(resp).await?;
1091            Ok(format!("Secret \"{name}\" deleted."))
1092        }
1093    }
1094}
1095
1096/// Coverage stub for secret execution.
1097#[cfg(coverage)]
1098#[allow(clippy::unnecessary_wraps)]
1099async fn execute_secret(
1100    _client: &reqwest::Client,
1101    _action: &SecretAction,
1102    _base: &str,
1103    _token: Option<&str>,
1104) -> Result<String> {
1105    Ok(String::new())
1106}
1107
1108/// Execute a worktree subcommand.
1109#[cfg(not(coverage))]
1110async fn execute_worktree(
1111    client: &reqwest::Client,
1112    action: &WorktreeAction,
1113    base: &str,
1114    token: Option<&str>,
1115) -> Result<String> {
1116    match action {
1117        WorktreeAction::List => {
1118            let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1119                .send()
1120                .await?;
1121            let text = ok_or_api_error(resp).await?;
1122            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1123            let wt_sessions: Vec<&Session> = sessions
1124                .iter()
1125                .filter(|s| s.worktree_path.is_some())
1126                .collect();
1127            Ok(format_worktree_sessions(&wt_sessions))
1128        }
1129    }
1130}
1131
1132/// Coverage stub for worktree execution.
1133#[cfg(coverage)]
1134#[allow(clippy::unnecessary_wraps)]
1135async fn execute_worktree(
1136    _client: &reqwest::Client,
1137    _action: &WorktreeAction,
1138    _base: &str,
1139    _token: Option<&str>,
1140) -> Result<String> {
1141    Ok(String::new())
1142}
1143
1144/// Format worktree sessions as a table.
1145fn format_worktree_sessions(sessions: &[&Session]) -> String {
1146    if sessions.is_empty() {
1147        return "No worktree sessions.".into();
1148    }
1149    let mut lines = vec![format!(
1150        "{:<20} {:<20} {:<10} {}",
1151        "NAME", "BRANCH", "STATUS", "PATH"
1152    )];
1153    for s in sessions {
1154        let branch = s.worktree_branch.as_deref().unwrap_or("-");
1155        let path = s.worktree_path.as_deref().unwrap_or("-");
1156        lines.push(format!(
1157            "{:<20} {:<20} {:<10} {}",
1158            s.name, branch, s.status, path
1159        ));
1160    }
1161    lines.join("\n")
1162}
1163
1164/// Format a list of schedules as a table.
1165#[cfg_attr(coverage, allow(dead_code))]
1166fn format_schedules(schedules: &[serde_json::Value]) -> String {
1167    if schedules.is_empty() {
1168        return "No schedules.".into();
1169    }
1170    let mut lines = vec![format!(
1171        "{:<20} {:<18} {:<8} {:<12} {}",
1172        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1173    )];
1174    for s in schedules {
1175        let name = s["name"].as_str().unwrap_or("?");
1176        let cron = s["cron"].as_str().unwrap_or("?");
1177        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1178            "yes"
1179        } else {
1180            "no"
1181        };
1182        let last_run = s["last_run_at"]
1183            .as_str()
1184            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1185        let node = s["target_node"].as_str().unwrap_or("local");
1186        lines.push(format!(
1187            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1188        ));
1189    }
1190    lines.join("\n")
1191}
1192
1193/// Select the best node from the peer registry based on load.
1194/// Returns the node address and name of the least loaded online peer.
1195/// Scoring: lower memory usage + fewer active sessions = better.
1196#[cfg(not(coverage))]
1197async fn select_best_node(
1198    client: &reqwest::Client,
1199    base: &str,
1200    token: Option<&str>,
1201) -> Result<(String, String)> {
1202    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1203        .send()
1204        .await?;
1205    let text = ok_or_api_error(resp).await?;
1206    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1207
1208    // Score: fewer active sessions is better, more memory is better
1209    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1210
1211    for peer in &peers_resp.peers {
1212        if peer.status != pulpo_common::peer::PeerStatus::Online {
1213            continue;
1214        }
1215        let sessions = peer.session_count.unwrap_or(0);
1216        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1217        // Lower score = better (fewer sessions, more memory)
1218        #[allow(clippy::cast_precision_loss)]
1219        let score = sessions as f64 - (mem as f64 / 1024.0);
1220        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1221            best = Some((peer.address.clone(), peer.name.clone(), score));
1222        }
1223    }
1224
1225    // Fall back to local if no online peers
1226    match best {
1227        Some((addr, name, _)) => Ok((addr, name)),
1228        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1229    }
1230}
1231
1232/// Coverage stub — auto-select always falls back to local.
1233#[cfg(coverage)]
1234#[allow(clippy::unnecessary_wraps)]
1235async fn select_best_node(
1236    _client: &reqwest::Client,
1237    _base: &str,
1238    _token: Option<&str>,
1239) -> Result<(String, String)> {
1240    Ok(("localhost:7433".into(), "local".into()))
1241}
1242
1243/// Try to start pulpod if it's not reachable on localhost.
1244/// Returns true if the daemon was started (or was already running).
1245#[cfg(not(coverage))]
1246async fn ensure_daemon_running(client: &reqwest::Client, url: &str, node: &str) -> bool {
1247    if !is_localhost(node) {
1248        return true; // Remote node — not our job to start
1249    }
1250    // Quick health check
1251    if client
1252        .get(format!("{url}/api/v1/health"))
1253        .timeout(std::time::Duration::from_secs(2))
1254        .send()
1255        .await
1256        .is_ok()
1257    {
1258        return true; // Already running
1259    }
1260
1261    eprintln!("pulpod is not running — starting it...");
1262
1263    // Try brew services first (macOS), then systemd (Linux), then direct spawn
1264    let started = if cfg!(target_os = "macos") {
1265        std::process::Command::new("brew")
1266            .args(["services", "start", "pulpo"])
1267            .stdout(std::process::Stdio::null())
1268            .stderr(std::process::Stdio::null())
1269            .status()
1270            .is_ok_and(|s| s.success())
1271    } else {
1272        std::process::Command::new("systemctl")
1273            .args(["--user", "start", "pulpo"])
1274            .stdout(std::process::Stdio::null())
1275            .stderr(std::process::Stdio::null())
1276            .status()
1277            .is_ok_and(|s| s.success())
1278    };
1279
1280    if !started {
1281        // Fallback: spawn pulpod directly in background
1282        if std::process::Command::new("pulpod")
1283            .stdout(std::process::Stdio::null())
1284            .stderr(std::process::Stdio::null())
1285            .spawn()
1286            .is_err()
1287        {
1288            eprintln!(
1289                "Failed to start pulpod. Install it with: brew install darioblanco/tap/pulpo"
1290            );
1291            return false;
1292        }
1293    }
1294
1295    // Wait for it to become reachable (up to 5 seconds)
1296    for _ in 0..10 {
1297        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1298        if client
1299            .get(format!("{url}/api/v1/health"))
1300            .timeout(std::time::Duration::from_secs(1))
1301            .send()
1302            .await
1303            .is_ok()
1304        {
1305            eprintln!("pulpod started.");
1306            return true;
1307        }
1308    }
1309    eprintln!("pulpod did not start in time.");
1310    false
1311}
1312
1313/// Coverage stub for daemon auto-start.
1314#[cfg(coverage)]
1315async fn ensure_daemon_running(_client: &reqwest::Client, _url: &str, _node: &str) -> bool {
1316    true
1317}
1318
1319/// Execute the given CLI command against the specified node.
1320#[allow(clippy::too_many_lines)]
1321pub async fn execute(cli: &Cli) -> Result<String> {
1322    let client = reqwest::Client::new();
1323    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1324    let url = base_url(&resolved_node);
1325    let node = &resolved_node;
1326
1327    // Auto-start pulpod if it's not running on localhost
1328    ensure_daemon_running(&client, &url, node).await;
1329
1330    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1331        .await
1332        .or(peer_token);
1333
1334    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1335    if cli.command.is_none() && cli.path.is_none() {
1336        // No subcommand and no path: print help
1337        use clap::CommandFactory;
1338        let mut cmd = Cli::command();
1339        cmd.print_help()?;
1340        println!();
1341        return Ok(String::new());
1342    }
1343    if cli.command.is_none() {
1344        let path = cli.path.as_deref().unwrap_or(".");
1345        let resolved_workdir = resolve_path(path);
1346        let base_name = derive_session_name(&resolved_workdir);
1347        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1348        let body = serde_json::json!({
1349            "name": name,
1350            "workdir": resolved_workdir,
1351        });
1352        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1353            .json(&body)
1354            .send()
1355            .await
1356            .map_err(|e| friendly_error(&e, node))?;
1357        let text = ok_or_api_error(resp).await?;
1358        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1359        let msg = format!(
1360            "Created session \"{}\" ({})",
1361            resp.session.name, resp.session.id
1362        );
1363        let backend_id = resp
1364            .session
1365            .backend_session_id
1366            .as_deref()
1367            .unwrap_or(&resp.session.name);
1368        eprintln!("{msg}");
1369        // Path shortcut spawns a shell (no command) — skip liveness check
1370        // since shell sessions are immediately detected as idle by the watchdog
1371        attach_session(backend_id)?;
1372        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1373    }
1374
1375    match cli.command.as_ref().unwrap() {
1376        Commands::Attach { name } => {
1377            // Fetch session to get status and backend_session_id
1378            let resp = authed_get(
1379                &client,
1380                format!("{url}/api/v1/sessions/{name}"),
1381                token.as_deref(),
1382            )
1383            .send()
1384            .await
1385            .map_err(|e| friendly_error(&e, node))?;
1386            let text = ok_or_api_error(resp).await?;
1387            let session: Session = serde_json::from_str(&text)?;
1388            match session.status.to_string().as_str() {
1389                "lost" => {
1390                    anyhow::bail!(
1391                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1392                    );
1393                }
1394                "stopped" => {
1395                    anyhow::bail!(
1396                        "Session \"{name}\" is {} — cannot attach to a stopped session.",
1397                        session.status
1398                    );
1399                }
1400                _ => {}
1401            }
1402            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1403            attach_session(&backend_id)?;
1404            Ok(format!("Detached from session {name}."))
1405        }
1406        Commands::Input { name, text } => {
1407            let input_text = text.as_deref().unwrap_or("\n");
1408            let body = serde_json::json!({ "text": input_text });
1409            let resp = authed_post(
1410                &client,
1411                format!("{url}/api/v1/sessions/{name}/input"),
1412                token.as_deref(),
1413            )
1414            .json(&body)
1415            .send()
1416            .await
1417            .map_err(|e| friendly_error(&e, node))?;
1418            ok_or_api_error(resp).await?;
1419            Ok(format!("Sent input to session {name}."))
1420        }
1421        Commands::List { all } => {
1422            let list_url = if *all {
1423                format!("{url}/api/v1/sessions")
1424            } else {
1425                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1426            };
1427            let resp = authed_get(&client, list_url, token.as_deref())
1428                .send()
1429                .await
1430                .map_err(|e| friendly_error(&e, node))?;
1431            let text = ok_or_api_error(resp).await?;
1432            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1433            Ok(format_sessions(&sessions))
1434        }
1435        Commands::Nodes => {
1436            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1437                .send()
1438                .await
1439                .map_err(|e| friendly_error(&e, node))?;
1440            let text = ok_or_api_error(resp).await?;
1441            let resp: PeersResponse = serde_json::from_str(&text)?;
1442            Ok(format_nodes(&resp))
1443        }
1444        Commands::Spawn {
1445            workdir,
1446            name,
1447            ink,
1448            description,
1449            detach,
1450            idle_threshold,
1451            auto,
1452            worktree,
1453            worktree_base,
1454            runtime,
1455            secret,
1456            command,
1457        } => {
1458            let cmd = if command.is_empty() {
1459                None
1460            } else {
1461                Some(command.join(" "))
1462            };
1463            // Resolve workdir: --workdir flag > current directory
1464            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1465                std::env::current_dir()
1466                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1467            });
1468            // Resolve name: explicit > derived from workdir (with dedup)
1469            let resolved_name = if let Some(n) = name {
1470                n.clone()
1471            } else {
1472                let base_name = derive_session_name(&resolved_workdir);
1473                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1474            };
1475            let mut body = serde_json::json!({
1476                "name": resolved_name,
1477                "workdir": resolved_workdir,
1478            });
1479            if let Some(c) = &cmd {
1480                body["command"] = serde_json::json!(c);
1481            }
1482            if let Some(i) = ink {
1483                body["ink"] = serde_json::json!(i);
1484            }
1485            if let Some(d) = description {
1486                body["description"] = serde_json::json!(d);
1487            }
1488            if let Some(t) = idle_threshold {
1489                body["idle_threshold_secs"] = serde_json::json!(t);
1490            }
1491            // --base-branch implies --worktree
1492            if *worktree || worktree_base.is_some() {
1493                body["worktree"] = serde_json::json!(true);
1494                if let Some(base) = worktree_base {
1495                    body["worktree_base"] = serde_json::json!(base);
1496                    eprintln!(
1497                        "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1498                    );
1499                } else {
1500                    eprintln!(
1501                        "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1502                    );
1503                }
1504            }
1505            if let Some(rt) = runtime {
1506                body["runtime"] = serde_json::json!(rt);
1507            }
1508            if !secret.is_empty() {
1509                body["secrets"] = serde_json::json!(secret);
1510            }
1511            let spawn_url = if *auto {
1512                let (auto_addr, auto_name) =
1513                    select_best_node(&client, &url, token.as_deref()).await?;
1514                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1515                base_url(&auto_addr)
1516            } else {
1517                url.clone()
1518            };
1519            let resp = authed_post(
1520                &client,
1521                format!("{spawn_url}/api/v1/sessions"),
1522                token.as_deref(),
1523            )
1524            .json(&body)
1525            .send()
1526            .await
1527            .map_err(|e| friendly_error(&e, node))?;
1528            let text = ok_or_api_error(resp).await?;
1529            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1530            let msg = format!(
1531                "Created session \"{}\" ({})",
1532                resp.session.name, resp.session.id
1533            );
1534            if !detach {
1535                let backend_id = resp
1536                    .session
1537                    .backend_session_id
1538                    .as_deref()
1539                    .unwrap_or(&resp.session.name);
1540                eprintln!("{msg}");
1541                // Only check liveness for explicit commands — shell sessions (no command)
1542                // may be immediately marked idle/stopped by the watchdog, which is expected
1543                if cmd.is_some() {
1544                    let sid = resp.session.id.to_string();
1545                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1546                }
1547                attach_session(backend_id)?;
1548                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1549            }
1550            Ok(msg)
1551        }
1552        Commands::Stop { names, purge } => {
1553            let mut results = Vec::new();
1554            for name in names {
1555                let query = if *purge { "?purge=true" } else { "" };
1556                let resp = authed_post(
1557                    &client,
1558                    format!("{url}/api/v1/sessions/{name}/stop{query}"),
1559                    token.as_deref(),
1560                )
1561                .send()
1562                .await
1563                .map_err(|e| friendly_error(&e, node))?;
1564                let action = if *purge {
1565                    "stopped and purged"
1566                } else {
1567                    "stopped"
1568                };
1569                match ok_or_api_error(resp).await {
1570                    Ok(_) => results.push(format!("Session {name} {action}.")),
1571                    Err(e) => results.push(format!("Error stopping {name}: {e}")),
1572                }
1573            }
1574            Ok(results.join("\n"))
1575        }
1576        Commands::Cleanup => {
1577            let resp = authed_post(
1578                &client,
1579                format!("{url}/api/v1/sessions/cleanup"),
1580                token.as_deref(),
1581            )
1582            .send()
1583            .await
1584            .map_err(|e| friendly_error(&e, node))?;
1585            let text = ok_or_api_error(resp).await?;
1586            let result: serde_json::Value = serde_json::from_str(&text)?;
1587            let count = result["deleted"].as_u64().unwrap_or(0);
1588            if count == 0 {
1589                Ok("No stopped or lost sessions to clean up.".into())
1590            } else {
1591                Ok(format!("Cleaned up {count} session(s)."))
1592            }
1593        }
1594        Commands::Logs {
1595            name,
1596            lines,
1597            follow,
1598        } => {
1599            if *follow {
1600                let mut stdout = std::io::stdout();
1601                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1602                    .await
1603                    .map_err(|e| {
1604                        // Unwrap reqwest errors to friendly messages
1605                        match e.downcast::<reqwest::Error>() {
1606                            Ok(re) => friendly_error(&re, node),
1607                            Err(other) => other,
1608                        }
1609                    })?;
1610                Ok(String::new())
1611            } else {
1612                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1613                    .await
1614                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1615                        Ok(re) => friendly_error(&re, node),
1616                        Err(other) => other,
1617                    })?;
1618                Ok(output)
1619            }
1620        }
1621        Commands::Interventions { name } => {
1622            let resp = authed_get(
1623                &client,
1624                format!("{url}/api/v1/sessions/{name}/interventions"),
1625                token.as_deref(),
1626            )
1627            .send()
1628            .await
1629            .map_err(|e| friendly_error(&e, node))?;
1630            let text = ok_or_api_error(resp).await?;
1631            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1632            Ok(format_interventions(&events))
1633        }
1634        Commands::Ui => {
1635            let dashboard = base_url(node);
1636            open_browser(&dashboard)?;
1637            Ok(format!("Opening {dashboard}"))
1638        }
1639        Commands::Resume { name } => {
1640            let resp = authed_post(
1641                &client,
1642                format!("{url}/api/v1/sessions/{name}/resume"),
1643                token.as_deref(),
1644            )
1645            .send()
1646            .await
1647            .map_err(|e| friendly_error(&e, node))?;
1648            let text = ok_or_api_error(resp).await?;
1649            let session: Session = serde_json::from_str(&text)?;
1650            let backend_id = session
1651                .backend_session_id
1652                .as_deref()
1653                .unwrap_or(&session.name);
1654            eprintln!("Resumed session \"{}\"", session.name);
1655            let sid = session.id.to_string();
1656            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1657            attach_session(backend_id)?;
1658            Ok(format!("Detached from session \"{}\".", session.name))
1659        }
1660        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1661            .await
1662            .map_err(|e| match e.downcast::<reqwest::Error>() {
1663                Ok(re) => friendly_error(&re, node),
1664                Err(other) => other,
1665            }),
1666        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1667            .await
1668            .map_err(|e| match e.downcast::<reqwest::Error>() {
1669                Ok(re) => friendly_error(&re, node),
1670                Err(other) => other,
1671            }),
1672        Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1673            .await
1674            .map_err(|e| match e.downcast::<reqwest::Error>() {
1675                Ok(re) => friendly_error(&re, node),
1676                Err(other) => other,
1677            }),
1678    }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683    use super::*;
1684
1685    /// Helper to build a minimal `Session` for `format_branch` tests.
1686    fn repo_session(workdir: &str, branch: Option<&str>) -> Session {
1687        Session {
1688            id: uuid::Uuid::nil(),
1689            name: "test".into(),
1690            workdir: workdir.into(),
1691            command: "echo".into(),
1692            description: None,
1693            status: pulpo_common::session::SessionStatus::Active,
1694            exit_code: None,
1695            backend_session_id: None,
1696            output_snapshot: None,
1697            metadata: None,
1698            ink: None,
1699            intervention_code: None,
1700            intervention_reason: None,
1701            intervention_at: None,
1702            last_output_at: None,
1703            idle_since: None,
1704            idle_threshold_secs: None,
1705            worktree_path: None,
1706            worktree_branch: None,
1707            git_branch: branch.map(Into::into),
1708            git_commit: None,
1709            git_files_changed: None,
1710            git_insertions: None,
1711            git_deletions: None,
1712            git_ahead: None,
1713            runtime: Runtime::Tmux,
1714            created_at: chrono::Utc::now(),
1715            updated_at: chrono::Utc::now(),
1716        }
1717    }
1718
1719    #[test]
1720    fn test_base_url() {
1721        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1722        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1723    }
1724
1725    #[test]
1726    fn test_cli_parse_list() {
1727        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1728        assert_eq!(cli.node, "localhost:7433");
1729        assert!(matches!(cli.command, Some(Commands::List { .. })));
1730    }
1731
1732    #[test]
1733    fn test_cli_parse_nodes() {
1734        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1735        assert!(matches!(cli.command, Some(Commands::Nodes)));
1736    }
1737
1738    #[test]
1739    fn test_cli_parse_ui() {
1740        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1741        assert!(matches!(cli.command, Some(Commands::Ui)));
1742    }
1743
1744    #[test]
1745    fn test_cli_parse_ui_custom_node() {
1746        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1747        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1748        assert_eq!(cli.node, "mac-mini:7433");
1749        assert_eq!(cli.path.as_deref(), Some("ui"));
1750    }
1751
1752    #[test]
1753    fn test_build_open_command() {
1754        let cmd = build_open_command("http://localhost:7433");
1755        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1756        assert_eq!(args, vec!["http://localhost:7433"]);
1757        #[cfg(target_os = "macos")]
1758        assert_eq!(cmd.get_program(), "open");
1759        #[cfg(target_os = "linux")]
1760        assert_eq!(cmd.get_program(), "xdg-open");
1761    }
1762
1763    #[test]
1764    fn test_cli_parse_spawn() {
1765        let cli = Cli::try_parse_from([
1766            "pulpo",
1767            "spawn",
1768            "my-task",
1769            "--workdir",
1770            "/tmp/repo",
1771            "--",
1772            "claude",
1773            "-p",
1774            "Fix the bug",
1775        ])
1776        .unwrap();
1777        assert!(matches!(
1778            &cli.command,
1779            Some(Commands::Spawn { name, workdir, command, .. })
1780                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1781                && command == &["claude", "-p", "Fix the bug"]
1782        ));
1783    }
1784
1785    #[test]
1786    fn test_cli_parse_spawn_with_ink() {
1787        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1788        assert!(matches!(
1789            &cli.command,
1790            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1791        ));
1792    }
1793
1794    #[test]
1795    fn test_cli_parse_spawn_with_description() {
1796        let cli =
1797            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1798                .unwrap();
1799        assert!(matches!(
1800            &cli.command,
1801            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1802        ));
1803    }
1804
1805    #[test]
1806    fn test_cli_parse_spawn_name_positional() {
1807        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1808        assert!(matches!(
1809            &cli.command,
1810            Some(Commands::Spawn { name, command, .. })
1811                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1812        ));
1813    }
1814
1815    #[test]
1816    fn test_cli_parse_spawn_no_command() {
1817        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1818        assert!(matches!(
1819            &cli.command,
1820            Some(Commands::Spawn { command, .. }) if command.is_empty()
1821        ));
1822    }
1823
1824    #[test]
1825    fn test_cli_parse_spawn_idle_threshold() {
1826        let cli =
1827            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1828        assert!(matches!(
1829            &cli.command,
1830            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1831        ));
1832    }
1833
1834    #[test]
1835    fn test_cli_parse_spawn_idle_threshold_60() {
1836        let cli =
1837            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1838        assert!(matches!(
1839            &cli.command,
1840            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1841        ));
1842    }
1843
1844    #[test]
1845    fn test_cli_parse_spawn_secrets() {
1846        let cli = Cli::try_parse_from([
1847            "pulpo",
1848            "spawn",
1849            "my-task",
1850            "--secret",
1851            "GITHUB_TOKEN",
1852            "--secret",
1853            "NPM_TOKEN",
1854        ])
1855        .unwrap();
1856        assert!(matches!(
1857            &cli.command,
1858            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1859        ));
1860    }
1861
1862    #[test]
1863    fn test_cli_parse_spawn_no_secrets() {
1864        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1865        assert!(matches!(
1866            &cli.command,
1867            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1868        ));
1869    }
1870
1871    #[test]
1872    fn test_cli_parse_secret_set_with_env() {
1873        let cli = Cli::try_parse_from([
1874            "pulpo",
1875            "secret",
1876            "set",
1877            "GH_WORK",
1878            "token123",
1879            "--env",
1880            "GITHUB_TOKEN",
1881        ])
1882        .unwrap();
1883        assert!(matches!(
1884            &cli.command,
1885            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1886                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1887        ));
1888    }
1889
1890    #[test]
1891    fn test_cli_parse_secret_set_without_env() {
1892        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1893        assert!(matches!(
1894            &cli.command,
1895            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1896                if name == "MY_KEY" && value == "val" && env.is_none()
1897        ));
1898    }
1899
1900    #[test]
1901    fn test_cli_parse_spawn_worktree() {
1902        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1903        assert!(matches!(
1904            &cli.command,
1905            Some(Commands::Spawn { worktree, .. }) if *worktree
1906        ));
1907    }
1908
1909    #[test]
1910    fn test_cli_parse_spawn_worktree_base() {
1911        let cli = Cli::try_parse_from([
1912            "pulpo",
1913            "spawn",
1914            "my-task",
1915            "--worktree",
1916            "--worktree-base",
1917            "main",
1918        ])
1919        .unwrap();
1920        assert!(matches!(
1921            &cli.command,
1922            Some(Commands::Spawn { worktree, worktree_base, .. })
1923                if *worktree && worktree_base.as_deref() == Some("main")
1924        ));
1925    }
1926
1927    #[test]
1928    fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1929        // --worktree-base without --worktree should still parse (implied at execute time)
1930        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1931            .unwrap();
1932        assert!(matches!(
1933            &cli.command,
1934            Some(Commands::Spawn { worktree_base, .. })
1935                if worktree_base.as_deref() == Some("develop")
1936        ));
1937    }
1938
1939    #[test]
1940    fn test_cli_parse_worktree_list() {
1941        let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1942        assert!(matches!(
1943            &cli.command,
1944            Some(Commands::Worktree {
1945                action: WorktreeAction::List
1946            })
1947        ));
1948    }
1949
1950    #[test]
1951    fn test_cli_parse_wt_alias() {
1952        let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1953        assert!(matches!(
1954            &cli.command,
1955            Some(Commands::Worktree {
1956                action: WorktreeAction::List
1957            })
1958        ));
1959    }
1960
1961    #[test]
1962    fn test_cli_parse_worktree_list_ls_alias() {
1963        let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1964        assert!(matches!(
1965            &cli.command,
1966            Some(Commands::Worktree {
1967                action: WorktreeAction::List
1968            })
1969        ));
1970    }
1971
1972    #[test]
1973    fn test_format_worktree_sessions_empty() {
1974        let output = format_worktree_sessions(&[]);
1975        assert_eq!(output, "No worktree sessions.");
1976    }
1977
1978    #[test]
1979    fn test_format_worktree_sessions_with_data() {
1980        use chrono::Utc;
1981        use pulpo_common::session::SessionStatus;
1982        use uuid::Uuid;
1983
1984        let session = Session {
1985            id: Uuid::nil(),
1986            name: "fix-auth".into(),
1987            workdir: "/tmp/repo".into(),
1988            command: "claude -p 'fix auth'".into(),
1989            description: None,
1990            status: SessionStatus::Active,
1991            exit_code: None,
1992            backend_session_id: None,
1993            output_snapshot: None,
1994            metadata: None,
1995            ink: None,
1996            intervention_code: None,
1997            intervention_reason: None,
1998            intervention_at: None,
1999            last_output_at: None,
2000            idle_since: None,
2001            idle_threshold_secs: None,
2002            worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
2003            worktree_branch: Some("fix-auth".into()),
2004            git_branch: None,
2005            git_commit: None,
2006            git_files_changed: None,
2007            git_insertions: None,
2008            git_deletions: None,
2009            git_ahead: None,
2010            runtime: Runtime::Tmux,
2011            created_at: Utc::now(),
2012            updated_at: Utc::now(),
2013        };
2014        let sessions = vec![&session];
2015        let output = format_worktree_sessions(&sessions);
2016        assert!(output.contains("fix-auth"), "should show name: {output}");
2017        assert!(output.contains("active"), "should show status: {output}");
2018        assert!(
2019            output.contains("/home/user/.pulpo/worktrees/fix-auth"),
2020            "should show path: {output}"
2021        );
2022        assert!(output.contains("BRANCH"), "should have header: {output}");
2023    }
2024
2025    #[test]
2026    fn test_format_worktree_sessions_no_branch() {
2027        use chrono::Utc;
2028        use pulpo_common::session::SessionStatus;
2029        use uuid::Uuid;
2030
2031        let session = Session {
2032            id: Uuid::nil(),
2033            name: "old-session".into(),
2034            workdir: "/tmp".into(),
2035            command: "echo".into(),
2036            description: None,
2037            status: SessionStatus::Active,
2038            exit_code: None,
2039            backend_session_id: None,
2040            output_snapshot: None,
2041            metadata: None,
2042            ink: None,
2043            intervention_code: None,
2044            intervention_reason: None,
2045            intervention_at: None,
2046            last_output_at: None,
2047            idle_since: None,
2048            idle_threshold_secs: None,
2049            worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
2050            worktree_branch: None,
2051            git_branch: None,
2052            git_commit: None,
2053            git_files_changed: None,
2054            git_insertions: None,
2055            git_deletions: None,
2056            git_ahead: None,
2057            runtime: Runtime::Tmux,
2058            created_at: Utc::now(),
2059            updated_at: Utc::now(),
2060        };
2061        let sessions = vec![&session];
2062        let output = format_worktree_sessions(&sessions);
2063        assert!(
2064            output.contains('-'),
2065            "branch should show dash when None: {output}"
2066        );
2067    }
2068
2069    #[test]
2070    fn test_cli_parse_spawn_detach() {
2071        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
2072        assert!(matches!(
2073            &cli.command,
2074            Some(Commands::Spawn { detach, .. }) if *detach
2075        ));
2076    }
2077
2078    #[test]
2079    fn test_cli_parse_spawn_detach_short() {
2080        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
2081        assert!(matches!(
2082            &cli.command,
2083            Some(Commands::Spawn { detach, .. }) if *detach
2084        ));
2085    }
2086
2087    #[test]
2088    fn test_cli_parse_spawn_detach_default() {
2089        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
2090        assert!(matches!(
2091            &cli.command,
2092            Some(Commands::Spawn { detach, .. }) if !detach
2093        ));
2094    }
2095
2096    #[test]
2097    fn test_cli_parse_logs() {
2098        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
2099        assert!(matches!(
2100            &cli.command,
2101            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
2102        ));
2103    }
2104
2105    #[test]
2106    fn test_cli_parse_logs_with_lines() {
2107        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
2108        assert!(matches!(
2109            &cli.command,
2110            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
2111        ));
2112    }
2113
2114    #[test]
2115    fn test_cli_parse_logs_follow() {
2116        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
2117        assert!(matches!(
2118            &cli.command,
2119            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2120        ));
2121    }
2122
2123    #[test]
2124    fn test_cli_parse_logs_follow_short() {
2125        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
2126        assert!(matches!(
2127            &cli.command,
2128            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2129        ));
2130    }
2131
2132    #[test]
2133    fn test_cli_parse_stop() {
2134        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
2135        assert!(matches!(
2136            &cli.command,
2137            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2138        ));
2139    }
2140
2141    #[test]
2142    fn test_cli_parse_stop_purge() {
2143        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
2144        assert!(matches!(
2145            &cli.command,
2146            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2147        ));
2148
2149        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
2150        assert!(matches!(
2151            &cli.command,
2152            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2153        ));
2154    }
2155
2156    #[test]
2157    fn test_cli_parse_kill_alias() {
2158        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
2159        assert!(matches!(
2160            &cli.command,
2161            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2162        ));
2163    }
2164
2165    #[test]
2166    fn test_cli_parse_resume() {
2167        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
2168        assert!(matches!(
2169            &cli.command,
2170            Some(Commands::Resume { name }) if name == "my-session"
2171        ));
2172    }
2173
2174    #[test]
2175    fn test_cli_parse_input() {
2176        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2177        assert!(matches!(
2178            &cli.command,
2179            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2180        ));
2181    }
2182
2183    #[test]
2184    fn test_cli_parse_input_no_text() {
2185        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2186        assert!(matches!(
2187            &cli.command,
2188            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2189        ));
2190    }
2191
2192    #[test]
2193    fn test_cli_parse_input_alias() {
2194        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2195        assert!(matches!(
2196            &cli.command,
2197            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2198        ));
2199    }
2200
2201    #[test]
2202    fn test_cli_parse_custom_node() {
2203        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2204        assert_eq!(cli.node, "win-pc:8080");
2205        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
2206        assert_eq!(cli.path.as_deref(), Some("list"));
2207    }
2208
2209    #[test]
2210    fn test_cli_version() {
2211        let result = Cli::try_parse_from(["pulpo", "--version"]);
2212        // clap exits with an error (kind DisplayVersion) when --version is used
2213        let err = result.unwrap_err();
2214        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2215    }
2216
2217    #[test]
2218    fn test_cli_parse_no_subcommand_succeeds() {
2219        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2220        assert!(cli.command.is_none());
2221        assert!(cli.path.is_none());
2222    }
2223
2224    #[test]
2225    fn test_cli_debug() {
2226        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2227        let debug = format!("{cli:?}");
2228        assert!(debug.contains("List"));
2229    }
2230
2231    #[test]
2232    fn test_commands_debug() {
2233        let cmd = Commands::List { all: false };
2234        assert_eq!(format!("{cmd:?}"), "List { all: false }");
2235    }
2236
2237    /// A valid Session JSON for test responses.
2238    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2239
2240    /// A valid `CreateSessionResponse` JSON wrapping the session.
2241    fn test_create_response_json() -> String {
2242        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2243    }
2244
2245    /// Start a lightweight test HTTP server and return its address.
2246    async fn start_test_server() -> String {
2247        use axum::http::StatusCode;
2248        use axum::{
2249            Json, Router,
2250            routing::{get, post},
2251        };
2252
2253        let create_json = test_create_response_json();
2254
2255        let app = Router::new()
2256            .route(
2257                "/api/v1/sessions",
2258                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2259                    (StatusCode::CREATED, create_json.clone())
2260                }),
2261            )
2262            .route(
2263                "/api/v1/sessions/{id}",
2264                get(|| async { TEST_SESSION_JSON.to_owned() }),
2265            )
2266            .route(
2267                "/api/v1/sessions/{id}/stop",
2268                post(|| async { StatusCode::NO_CONTENT }),
2269            )
2270            .route(
2271                "/api/v1/sessions/{id}/output",
2272                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2273            )
2274            .route(
2275                "/api/v1/peers",
2276                get(|| async {
2277                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2278                }),
2279            )
2280            .route(
2281                "/api/v1/sessions/{id}/resume",
2282                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2283            )
2284            .route(
2285                "/api/v1/sessions/{id}/interventions",
2286                get(|| async { "[]".to_owned() }),
2287            )
2288            .route(
2289                "/api/v1/sessions/{id}/input",
2290                post(|| async { StatusCode::NO_CONTENT }),
2291            )
2292            .route(
2293                "/api/v1/schedules",
2294                get(|| async { Json::<Vec<()>>(vec![]) })
2295                    .post(|| async { StatusCode::CREATED }),
2296            )
2297            .route(
2298                "/api/v1/schedules/{id}",
2299                axum::routing::put(|| async { StatusCode::OK })
2300                    .delete(|| async { StatusCode::NO_CONTENT }),
2301            );
2302
2303        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2304        let addr = listener.local_addr().unwrap();
2305        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2306        format!("127.0.0.1:{}", addr.port())
2307    }
2308
2309    #[tokio::test]
2310    async fn test_execute_list_success() {
2311        let node = start_test_server().await;
2312        let cli = Cli {
2313            node,
2314            token: None,
2315            command: Some(Commands::List { all: false }),
2316            path: None,
2317        };
2318        let result = execute(&cli).await.unwrap();
2319        assert_eq!(result, "No sessions.");
2320    }
2321
2322    #[tokio::test]
2323    async fn test_execute_nodes_success() {
2324        let node = start_test_server().await;
2325        let cli = Cli {
2326            node,
2327            token: None,
2328            command: Some(Commands::Nodes),
2329            path: None,
2330        };
2331        let result = execute(&cli).await.unwrap();
2332        assert!(result.contains("test"));
2333        assert!(result.contains("(local)"));
2334        assert!(result.contains("NAME"));
2335    }
2336
2337    #[tokio::test]
2338    async fn test_execute_spawn_success() {
2339        let node = start_test_server().await;
2340        let cli = Cli {
2341            node,
2342            token: None,
2343            command: Some(Commands::Spawn {
2344                name: Some("test".into()),
2345                workdir: Some("/tmp/repo".into()),
2346                ink: None,
2347                description: None,
2348                detach: true,
2349                idle_threshold: None,
2350                auto: false,
2351                worktree: false,
2352                worktree_base: None,
2353                runtime: None,
2354                secret: vec![],
2355                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2356            }),
2357            path: None,
2358        };
2359        let result = execute(&cli).await.unwrap();
2360        assert!(result.contains("Created session"));
2361        assert!(result.contains("repo"));
2362    }
2363
2364    #[tokio::test]
2365    async fn test_execute_spawn_with_all_flags() {
2366        let node = start_test_server().await;
2367        let cli = Cli {
2368            node,
2369            token: None,
2370            command: Some(Commands::Spawn {
2371                name: Some("test".into()),
2372                workdir: Some("/tmp/repo".into()),
2373                ink: Some("coder".into()),
2374                description: Some("Fix the bug".into()),
2375                detach: true,
2376                idle_threshold: None,
2377                auto: false,
2378                worktree: false,
2379                worktree_base: None,
2380                runtime: None,
2381                secret: vec![],
2382                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2383            }),
2384            path: None,
2385        };
2386        let result = execute(&cli).await.unwrap();
2387        assert!(result.contains("Created session"));
2388    }
2389
2390    #[tokio::test]
2391    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2392        let node = start_test_server().await;
2393        let cli = Cli {
2394            node,
2395            token: None,
2396            command: Some(Commands::Spawn {
2397                name: Some("full-opts".into()),
2398                workdir: Some("/tmp/repo".into()),
2399                ink: Some("coder".into()),
2400                description: Some("Full options".into()),
2401                detach: true,
2402                idle_threshold: Some(120),
2403                auto: false,
2404                worktree: true,
2405                worktree_base: None,
2406                runtime: Some("docker".into()),
2407                secret: vec![],
2408                command: vec!["claude".into()],
2409            }),
2410            path: None,
2411        };
2412        let result = execute(&cli).await.unwrap();
2413        assert!(result.contains("Created session"));
2414    }
2415
2416    #[tokio::test]
2417    async fn test_execute_spawn_no_name_derives_from_workdir() {
2418        let node = start_test_server().await;
2419        let cli = Cli {
2420            node,
2421            token: None,
2422            command: Some(Commands::Spawn {
2423                name: None,
2424                workdir: Some("/tmp/my-project".into()),
2425                ink: None,
2426                description: None,
2427                detach: true,
2428                idle_threshold: None,
2429                auto: false,
2430                worktree: false,
2431                worktree_base: None,
2432                runtime: None,
2433                secret: vec![],
2434                command: vec!["echo".into(), "hello".into()],
2435            }),
2436            path: None,
2437        };
2438        let result = execute(&cli).await.unwrap();
2439        assert!(result.contains("Created session"));
2440    }
2441
2442    #[tokio::test]
2443    async fn test_execute_spawn_no_command() {
2444        let node = start_test_server().await;
2445        let cli = Cli {
2446            node,
2447            token: None,
2448            command: Some(Commands::Spawn {
2449                name: Some("test".into()),
2450                workdir: Some("/tmp/repo".into()),
2451                ink: None,
2452                description: None,
2453                detach: true,
2454                idle_threshold: None,
2455                auto: false,
2456                worktree: false,
2457                worktree_base: None,
2458                runtime: None,
2459                secret: vec![],
2460                command: vec![],
2461            }),
2462            path: None,
2463        };
2464        let result = execute(&cli).await.unwrap();
2465        assert!(result.contains("Created session"));
2466    }
2467
2468    #[tokio::test]
2469    async fn test_execute_spawn_with_name() {
2470        let node = start_test_server().await;
2471        let cli = Cli {
2472            node,
2473            token: None,
2474            command: Some(Commands::Spawn {
2475                name: Some("my-task".into()),
2476                workdir: Some("/tmp/repo".into()),
2477                ink: None,
2478                description: None,
2479                detach: true,
2480                idle_threshold: None,
2481                auto: false,
2482                worktree: false,
2483                worktree_base: None,
2484                runtime: None,
2485                secret: vec![],
2486                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2487            }),
2488            path: None,
2489        };
2490        let result = execute(&cli).await.unwrap();
2491        assert!(result.contains("Created session"));
2492    }
2493
2494    #[tokio::test]
2495    async fn test_execute_spawn_auto_attach() {
2496        let node = start_test_server().await;
2497        let cli = Cli {
2498            node,
2499            token: None,
2500            command: Some(Commands::Spawn {
2501                name: Some("test".into()),
2502                workdir: Some("/tmp/repo".into()),
2503                ink: None,
2504                description: None,
2505                detach: false,
2506                idle_threshold: None,
2507                auto: false,
2508                worktree: false,
2509                worktree_base: None,
2510                runtime: None,
2511                secret: vec![],
2512                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2513            }),
2514            path: None,
2515        };
2516        let result = execute(&cli).await.unwrap();
2517        // When not detached, spawn prints creation to stderr and returns detach message
2518        assert!(result.contains("Detached from session"));
2519    }
2520
2521    #[tokio::test]
2522    async fn test_execute_stop_success() {
2523        let node = start_test_server().await;
2524        let cli = Cli {
2525            node,
2526            token: None,
2527            command: Some(Commands::Stop {
2528                names: vec!["test-session".into()],
2529                purge: false,
2530            }),
2531            path: None,
2532        };
2533        let result = execute(&cli).await.unwrap();
2534        assert!(result.contains("stopped"));
2535        assert!(!result.contains("purged"));
2536    }
2537
2538    #[tokio::test]
2539    async fn test_execute_stop_with_purge() {
2540        let node = start_test_server().await;
2541        let cli = Cli {
2542            node,
2543            token: None,
2544            command: Some(Commands::Stop {
2545                names: vec!["test-session".into()],
2546                purge: true,
2547            }),
2548            path: None,
2549        };
2550        let result = execute(&cli).await.unwrap();
2551        assert!(result.contains("stopped and purged"));
2552    }
2553
2554    #[tokio::test]
2555    async fn test_execute_logs_success() {
2556        let node = start_test_server().await;
2557        let cli = Cli {
2558            node,
2559            token: None,
2560            command: Some(Commands::Logs {
2561                name: "test-session".into(),
2562                lines: 50,
2563                follow: false,
2564            }),
2565            path: None,
2566        };
2567        let result = execute(&cli).await.unwrap();
2568        assert!(result.contains("test output"));
2569    }
2570
2571    #[tokio::test]
2572    async fn test_execute_list_connection_refused() {
2573        let cli = Cli {
2574            node: "localhost:1".into(),
2575            token: None,
2576            command: Some(Commands::List { all: false }),
2577            path: None,
2578        };
2579        let result = execute(&cli).await;
2580        let err = result.unwrap_err().to_string();
2581        assert!(
2582            err.contains("Could not connect to pulpod"),
2583            "Expected friendly error, got: {err}"
2584        );
2585        assert!(err.contains("localhost:1"));
2586    }
2587
2588    #[tokio::test]
2589    async fn test_execute_nodes_connection_refused() {
2590        let cli = Cli {
2591            node: "localhost:1".into(),
2592            token: None,
2593            command: Some(Commands::Nodes),
2594            path: None,
2595        };
2596        let result = execute(&cli).await;
2597        let err = result.unwrap_err().to_string();
2598        assert!(err.contains("Could not connect to pulpod"));
2599    }
2600
2601    #[tokio::test]
2602    async fn test_execute_stop_error_response() {
2603        use axum::{Router, http::StatusCode, routing::post};
2604
2605        let app = Router::new().route(
2606            "/api/v1/sessions/{id}/stop",
2607            post(|| async {
2608                (
2609                    StatusCode::NOT_FOUND,
2610                    "{\"error\":\"session not found: test-session\"}",
2611                )
2612            }),
2613        );
2614        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2615        let addr = listener.local_addr().unwrap();
2616        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2617        let node = format!("127.0.0.1:{}", addr.port());
2618
2619        let cli = Cli {
2620            node,
2621            token: None,
2622            command: Some(Commands::Stop {
2623                names: vec!["test-session".into()],
2624                purge: false,
2625            }),
2626            path: None,
2627        };
2628        let result = execute(&cli).await.unwrap();
2629        assert!(result.contains("Error stopping test-session"), "{result}");
2630    }
2631
2632    #[tokio::test]
2633    async fn test_execute_logs_error_response() {
2634        use axum::{Router, http::StatusCode, routing::get};
2635
2636        let app = Router::new().route(
2637            "/api/v1/sessions/{id}/output",
2638            get(|| async {
2639                (
2640                    StatusCode::NOT_FOUND,
2641                    "{\"error\":\"session not found: ghost\"}",
2642                )
2643            }),
2644        );
2645        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646        let addr = listener.local_addr().unwrap();
2647        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2648        let node = format!("127.0.0.1:{}", addr.port());
2649
2650        let cli = Cli {
2651            node,
2652            token: None,
2653            command: Some(Commands::Logs {
2654                name: "ghost".into(),
2655                lines: 50,
2656                follow: false,
2657            }),
2658            path: None,
2659        };
2660        let err = execute(&cli).await.unwrap_err();
2661        assert_eq!(err.to_string(), "session not found: ghost");
2662    }
2663
2664    #[tokio::test]
2665    async fn test_execute_resume_error_response() {
2666        use axum::{Router, http::StatusCode, routing::post};
2667
2668        let app = Router::new().route(
2669            "/api/v1/sessions/{id}/resume",
2670            post(|| async {
2671                (
2672                    StatusCode::BAD_REQUEST,
2673                    "{\"error\":\"session is not lost (status: active)\"}",
2674                )
2675            }),
2676        );
2677        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2678        let addr = listener.local_addr().unwrap();
2679        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2680        let node = format!("127.0.0.1:{}", addr.port());
2681
2682        let cli = Cli {
2683            node,
2684            token: None,
2685            command: Some(Commands::Resume {
2686                name: "test-session".into(),
2687            }),
2688            path: None,
2689        };
2690        let err = execute(&cli).await.unwrap_err();
2691        assert_eq!(err.to_string(), "session is not lost (status: active)");
2692    }
2693
2694    #[tokio::test]
2695    async fn test_execute_spawn_error_response() {
2696        use axum::{Router, http::StatusCode, routing::post};
2697
2698        let app = Router::new().route(
2699            "/api/v1/sessions",
2700            post(|| async {
2701                (
2702                    StatusCode::INTERNAL_SERVER_ERROR,
2703                    "{\"error\":\"failed to spawn session\"}",
2704                )
2705            }),
2706        );
2707        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2708        let addr = listener.local_addr().unwrap();
2709        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2710        let node = format!("127.0.0.1:{}", addr.port());
2711
2712        let cli = Cli {
2713            node,
2714            token: None,
2715            command: Some(Commands::Spawn {
2716                name: Some("test".into()),
2717                workdir: Some("/tmp/repo".into()),
2718                ink: None,
2719                description: None,
2720                detach: true,
2721                idle_threshold: None,
2722                auto: false,
2723                worktree: false,
2724                worktree_base: None,
2725                runtime: None,
2726                secret: vec![],
2727                command: vec!["test".into()],
2728            }),
2729            path: None,
2730        };
2731        let err = execute(&cli).await.unwrap_err();
2732        assert_eq!(err.to_string(), "failed to spawn session");
2733    }
2734
2735    #[tokio::test]
2736    async fn test_execute_interventions_error_response() {
2737        use axum::{Router, http::StatusCode, routing::get};
2738
2739        let app = Router::new().route(
2740            "/api/v1/sessions/{id}/interventions",
2741            get(|| async {
2742                (
2743                    StatusCode::NOT_FOUND,
2744                    "{\"error\":\"session not found: ghost\"}",
2745                )
2746            }),
2747        );
2748        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2749        let addr = listener.local_addr().unwrap();
2750        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2751        let node = format!("127.0.0.1:{}", addr.port());
2752
2753        let cli = Cli {
2754            node,
2755            token: None,
2756            command: Some(Commands::Interventions {
2757                name: "ghost".into(),
2758            }),
2759            path: None,
2760        };
2761        let err = execute(&cli).await.unwrap_err();
2762        assert_eq!(err.to_string(), "session not found: ghost");
2763    }
2764
2765    #[tokio::test]
2766    async fn test_execute_resume_success() {
2767        let node = start_test_server().await;
2768        let cli = Cli {
2769            node,
2770            token: None,
2771            command: Some(Commands::Resume {
2772                name: "test-session".into(),
2773            }),
2774            path: None,
2775        };
2776        let result = execute(&cli).await.unwrap();
2777        assert!(result.contains("Detached from session"));
2778    }
2779
2780    #[tokio::test]
2781    async fn test_execute_input_success() {
2782        let node = start_test_server().await;
2783        let cli = Cli {
2784            node,
2785            token: None,
2786            command: Some(Commands::Input {
2787                name: "test-session".into(),
2788                text: Some("yes".into()),
2789            }),
2790            path: None,
2791        };
2792        let result = execute(&cli).await.unwrap();
2793        assert!(result.contains("Sent input to session test-session"));
2794    }
2795
2796    #[tokio::test]
2797    async fn test_execute_input_no_text() {
2798        let node = start_test_server().await;
2799        let cli = Cli {
2800            node,
2801            token: None,
2802            command: Some(Commands::Input {
2803                name: "test-session".into(),
2804                text: None,
2805            }),
2806            path: None,
2807        };
2808        let result = execute(&cli).await.unwrap();
2809        assert!(result.contains("Sent input to session test-session"));
2810    }
2811
2812    #[tokio::test]
2813    async fn test_execute_input_connection_refused() {
2814        let cli = Cli {
2815            node: "localhost:1".into(),
2816            token: None,
2817            command: Some(Commands::Input {
2818                name: "test".into(),
2819                text: Some("y".into()),
2820            }),
2821            path: None,
2822        };
2823        let result = execute(&cli).await;
2824        let err = result.unwrap_err().to_string();
2825        assert!(err.contains("Could not connect to pulpod"));
2826    }
2827
2828    #[tokio::test]
2829    async fn test_execute_input_error_response() {
2830        use axum::{Router, http::StatusCode, routing::post};
2831
2832        let app = Router::new().route(
2833            "/api/v1/sessions/{id}/input",
2834            post(|| async {
2835                (
2836                    StatusCode::NOT_FOUND,
2837                    "{\"error\":\"session not found: ghost\"}",
2838                )
2839            }),
2840        );
2841        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2842        let addr = listener.local_addr().unwrap();
2843        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2844        let node = format!("127.0.0.1:{}", addr.port());
2845
2846        let cli = Cli {
2847            node,
2848            token: None,
2849            command: Some(Commands::Input {
2850                name: "ghost".into(),
2851                text: Some("y".into()),
2852            }),
2853            path: None,
2854        };
2855        let err = execute(&cli).await.unwrap_err();
2856        assert_eq!(err.to_string(), "session not found: ghost");
2857    }
2858
2859    #[tokio::test]
2860    async fn test_execute_ui() {
2861        let cli = Cli {
2862            node: "localhost:7433".into(),
2863            token: None,
2864            command: Some(Commands::Ui),
2865            path: None,
2866        };
2867        let result = execute(&cli).await.unwrap();
2868        assert!(result.contains("Opening"));
2869        assert!(result.contains("http://localhost:7433"));
2870    }
2871
2872    #[tokio::test]
2873    async fn test_execute_ui_custom_node() {
2874        let cli = Cli {
2875            node: "mac-mini:7433".into(),
2876            token: None,
2877            command: Some(Commands::Ui),
2878            path: None,
2879        };
2880        let result = execute(&cli).await.unwrap();
2881        assert!(result.contains("http://mac-mini:7433"));
2882    }
2883
2884    #[test]
2885    fn test_format_sessions_empty() {
2886        assert_eq!(format_sessions(&[]), "No sessions.");
2887    }
2888
2889    #[test]
2890    fn test_format_sessions_with_data() {
2891        use chrono::Utc;
2892        use pulpo_common::session::SessionStatus;
2893        use uuid::Uuid;
2894
2895        let sessions = vec![Session {
2896            id: Uuid::nil(),
2897            name: "my-api".into(),
2898            workdir: "/tmp/repo".into(),
2899            command: "claude -p 'Fix the bug'".into(),
2900            description: Some("Fix the bug".into()),
2901            status: SessionStatus::Active,
2902            exit_code: None,
2903            backend_session_id: None,
2904            output_snapshot: None,
2905            metadata: None,
2906            ink: None,
2907            intervention_code: None,
2908            intervention_reason: None,
2909            intervention_at: None,
2910            last_output_at: None,
2911            idle_since: None,
2912            idle_threshold_secs: None,
2913            worktree_path: None,
2914            worktree_branch: None,
2915            git_branch: None,
2916            git_commit: None,
2917            git_files_changed: None,
2918            git_insertions: None,
2919            git_deletions: None,
2920            git_ahead: None,
2921            runtime: Runtime::Tmux,
2922            created_at: Utc::now(),
2923            updated_at: Utc::now(),
2924        }];
2925        let output = format_sessions(&sessions);
2926        assert!(output.contains("ID"));
2927        assert!(output.contains("NAME"));
2928        assert!(output.contains("BRANCH"));
2929        assert!(output.contains("COMMAND"));
2930        assert!(output.contains("00000000"));
2931        assert!(output.contains("my-api"));
2932        assert!(output.contains("active"));
2933        assert!(output.contains("claude -p 'Fix the bug'"));
2934    }
2935
2936    #[test]
2937    fn test_format_branch_without_branch() {
2938        let s = repo_session("/home/user/test", None);
2939        assert_eq!(format_branch(&s), "-");
2940    }
2941
2942    #[test]
2943    fn test_format_branch_with_branch() {
2944        let s = repo_session("/home/user/pulpo", Some("main"));
2945        assert_eq!(format_branch(&s), "main");
2946    }
2947
2948    #[test]
2949    fn test_format_branch_with_diff_stats() {
2950        let mut s = repo_session("/home/user/pulpo", Some("main"));
2951        s.git_insertions = Some(42);
2952        s.git_deletions = Some(7);
2953        let result = format_branch(&s);
2954        assert_eq!(result, "main +42/-7");
2955    }
2956
2957    #[test]
2958    fn test_format_branch_with_ahead() {
2959        let mut s = repo_session("/home/user/pulpo", Some("main"));
2960        s.git_ahead = Some(3);
2961        let result = format_branch(&s);
2962        assert!(result.contains("\u{2191}3"));
2963    }
2964
2965    #[test]
2966    fn test_format_branch_zero_diff_hidden() {
2967        let mut s = repo_session("/home/user/pulpo", None);
2968        s.git_insertions = Some(0);
2969        s.git_deletions = Some(0);
2970        let result = format_branch(&s);
2971        assert!(!result.contains("+0/-0"));
2972    }
2973
2974    #[test]
2975    fn test_format_branch_zero_ahead_hidden() {
2976        let mut s = repo_session("/home/user/pulpo", None);
2977        s.git_ahead = Some(0);
2978        let result = format_branch(&s);
2979        assert!(!result.contains('\u{2191}'));
2980    }
2981
2982    #[test]
2983    fn test_format_sessions_with_git_branch() {
2984        use chrono::Utc;
2985        use pulpo_common::session::SessionStatus;
2986        use uuid::Uuid;
2987
2988        let sessions = vec![Session {
2989            id: Uuid::nil(),
2990            name: "my-api".into(),
2991            workdir: "/tmp/repo".into(),
2992            command: "echo hello".into(),
2993            description: None,
2994            status: SessionStatus::Active,
2995            exit_code: None,
2996            backend_session_id: None,
2997            output_snapshot: None,
2998            metadata: None,
2999            ink: None,
3000            intervention_code: None,
3001            intervention_reason: None,
3002            intervention_at: None,
3003            last_output_at: None,
3004            idle_since: None,
3005            idle_threshold_secs: None,
3006            worktree_path: None,
3007            worktree_branch: None,
3008            git_branch: Some("main".into()),
3009            git_commit: Some("abc1234".into()),
3010            git_files_changed: None,
3011            git_insertions: None,
3012            git_deletions: None,
3013            git_ahead: None,
3014            runtime: Runtime::Tmux,
3015            created_at: Utc::now(),
3016            updated_at: Utc::now(),
3017        }];
3018        let output = format_sessions(&sessions);
3019        assert!(output.contains("main"), "should show branch: {output}");
3020    }
3021
3022    #[test]
3023    fn test_format_sessions_with_error_status() {
3024        use chrono::Utc;
3025        use pulpo_common::session::SessionStatus;
3026        use uuid::Uuid;
3027
3028        let mut meta = std::collections::HashMap::new();
3029        meta.insert("error_status".into(), "Compile error".into());
3030        let sessions = vec![Session {
3031            id: Uuid::nil(),
3032            name: "my-api".into(),
3033            workdir: "/tmp/repo".into(),
3034            command: "echo hello".into(),
3035            description: None,
3036            status: SessionStatus::Active,
3037            exit_code: None,
3038            backend_session_id: None,
3039            output_snapshot: None,
3040            metadata: Some(meta),
3041            ink: None,
3042            intervention_code: None,
3043            intervention_reason: None,
3044            intervention_at: None,
3045            last_output_at: None,
3046            idle_since: None,
3047            idle_threshold_secs: None,
3048            worktree_path: None,
3049            worktree_branch: None,
3050            git_branch: None,
3051            git_commit: None,
3052            git_files_changed: None,
3053            git_insertions: None,
3054            git_deletions: None,
3055            git_ahead: None,
3056            runtime: Runtime::Tmux,
3057            created_at: Utc::now(),
3058            updated_at: Utc::now(),
3059        }];
3060        let output = format_sessions(&sessions);
3061        assert!(output.contains("[!]"));
3062    }
3063
3064    #[test]
3065    fn test_format_sessions_docker_runtime() {
3066        use chrono::Utc;
3067        use pulpo_common::session::SessionStatus;
3068        use uuid::Uuid;
3069
3070        let sessions = vec![Session {
3071            id: Uuid::nil(),
3072            name: "sandbox-test".into(),
3073            workdir: "/tmp".into(),
3074            command: "claude".into(),
3075            description: None,
3076            status: SessionStatus::Active,
3077            exit_code: None,
3078            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
3079            output_snapshot: None,
3080            metadata: None,
3081            ink: None,
3082            intervention_code: None,
3083            intervention_reason: None,
3084            intervention_at: None,
3085            last_output_at: None,
3086            idle_since: None,
3087            idle_threshold_secs: None,
3088            worktree_path: None,
3089            worktree_branch: None,
3090            git_branch: None,
3091            git_commit: None,
3092            git_files_changed: None,
3093            git_insertions: None,
3094            git_deletions: None,
3095            git_ahead: None,
3096            runtime: Runtime::Docker,
3097            created_at: Utc::now(),
3098            updated_at: Utc::now(),
3099        }];
3100        let output = format_sessions(&sessions);
3101        assert!(
3102            output.contains("sandbox-test"),
3103            "should show name: {output}"
3104        );
3105        assert!(
3106            output.contains('-'),
3107            "branch should show dash when None: {output}"
3108        );
3109    }
3110
3111    #[test]
3112    fn test_format_sessions_long_command_truncated() {
3113        use chrono::Utc;
3114        use pulpo_common::session::SessionStatus;
3115        use uuid::Uuid;
3116
3117        let sessions = vec![Session {
3118            id: Uuid::nil(),
3119            name: "test".into(),
3120            workdir: "/tmp".into(),
3121            command:
3122                "claude -p 'A very long command that exceeds fifty characters in total length here'"
3123                    .into(),
3124            description: None,
3125            status: SessionStatus::Ready,
3126            exit_code: None,
3127            backend_session_id: None,
3128            output_snapshot: None,
3129            metadata: None,
3130            ink: None,
3131            intervention_code: None,
3132            intervention_reason: None,
3133            intervention_at: None,
3134            last_output_at: None,
3135            idle_since: None,
3136            idle_threshold_secs: None,
3137            worktree_path: None,
3138            worktree_branch: None,
3139            git_branch: None,
3140            git_commit: None,
3141            git_files_changed: None,
3142            git_insertions: None,
3143            git_deletions: None,
3144            git_ahead: None,
3145            runtime: Runtime::Tmux,
3146            created_at: Utc::now(),
3147            updated_at: Utc::now(),
3148        }];
3149        let output = format_sessions(&sessions);
3150        assert!(output.contains("..."));
3151    }
3152
3153    #[test]
3154    fn test_format_sessions_worktree_indicator() {
3155        use chrono::Utc;
3156        use pulpo_common::session::SessionStatus;
3157        use uuid::Uuid;
3158
3159        let sessions = vec![Session {
3160            id: Uuid::nil(),
3161            name: "wt-task".into(),
3162            workdir: "/repo".into(),
3163            command: "claude".into(),
3164            description: None,
3165            status: SessionStatus::Active,
3166            exit_code: None,
3167            backend_session_id: None,
3168            output_snapshot: None,
3169            metadata: None,
3170            ink: None,
3171            intervention_code: None,
3172            intervention_reason: None,
3173            intervention_at: None,
3174            last_output_at: None,
3175            idle_since: None,
3176            idle_threshold_secs: None,
3177            worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
3178            worktree_branch: Some("wt-task".into()),
3179            git_branch: None,
3180            git_commit: None,
3181            git_files_changed: None,
3182            git_insertions: None,
3183            git_deletions: None,
3184            git_ahead: None,
3185            runtime: Runtime::Tmux,
3186            created_at: Utc::now(),
3187            updated_at: Utc::now(),
3188        }];
3189        let output = format_sessions(&sessions);
3190        assert!(
3191            output.contains("[wt]"),
3192            "should show worktree indicator: {output}"
3193        );
3194        assert!(output.contains("wt-task [wt]"));
3195    }
3196
3197    #[test]
3198    fn test_format_sessions_pr_indicator() {
3199        use chrono::Utc;
3200        use pulpo_common::session::SessionStatus;
3201        use std::collections::HashMap;
3202        use uuid::Uuid;
3203
3204        let mut meta = HashMap::new();
3205        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3206        let sessions = vec![Session {
3207            id: Uuid::nil(),
3208            name: "pr-task".into(),
3209            workdir: "/tmp".into(),
3210            command: "claude".into(),
3211            description: None,
3212            status: SessionStatus::Active,
3213            exit_code: None,
3214            backend_session_id: None,
3215            output_snapshot: None,
3216            metadata: Some(meta),
3217            ink: None,
3218            intervention_code: None,
3219            intervention_reason: None,
3220            intervention_at: None,
3221            last_output_at: None,
3222            idle_since: None,
3223            idle_threshold_secs: None,
3224            worktree_path: None,
3225            worktree_branch: None,
3226            git_branch: None,
3227            git_commit: None,
3228            git_files_changed: None,
3229            git_insertions: None,
3230            git_deletions: None,
3231            git_ahead: None,
3232            runtime: Runtime::Tmux,
3233            created_at: Utc::now(),
3234            updated_at: Utc::now(),
3235        }];
3236        let output = format_sessions(&sessions);
3237        assert!(
3238            output.contains("[PR]"),
3239            "should show PR indicator: {output}"
3240        );
3241        assert!(output.contains("pr-task [PR]"));
3242    }
3243
3244    #[test]
3245    fn test_format_sessions_worktree_and_pr_indicator() {
3246        use chrono::Utc;
3247        use pulpo_common::session::SessionStatus;
3248        use std::collections::HashMap;
3249        use uuid::Uuid;
3250
3251        let mut meta = HashMap::new();
3252        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3253        let sessions = vec![Session {
3254            id: Uuid::nil(),
3255            name: "both-task".into(),
3256            workdir: "/tmp".into(),
3257            command: "claude".into(),
3258            description: None,
3259            status: SessionStatus::Active,
3260            exit_code: None,
3261            backend_session_id: None,
3262            output_snapshot: None,
3263            metadata: Some(meta),
3264            ink: None,
3265            intervention_code: None,
3266            intervention_reason: None,
3267            intervention_at: None,
3268            last_output_at: None,
3269            idle_since: None,
3270            idle_threshold_secs: None,
3271            worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
3272            worktree_branch: Some("both-task".into()),
3273            git_branch: None,
3274            git_commit: None,
3275            git_files_changed: None,
3276            git_insertions: None,
3277            git_deletions: None,
3278            git_ahead: None,
3279            runtime: Runtime::Tmux,
3280            created_at: Utc::now(),
3281            updated_at: Utc::now(),
3282        }];
3283        let output = format_sessions(&sessions);
3284        assert!(
3285            output.contains("[wt] [PR]"),
3286            "should show both indicators: {output}"
3287        );
3288    }
3289
3290    #[test]
3291    fn test_format_sessions_no_pr_without_metadata() {
3292        use chrono::Utc;
3293        use pulpo_common::session::SessionStatus;
3294        use uuid::Uuid;
3295
3296        let sessions = vec![Session {
3297            id: Uuid::nil(),
3298            name: "no-pr".into(),
3299            workdir: "/tmp".into(),
3300            command: "claude".into(),
3301            description: None,
3302            status: SessionStatus::Active,
3303            exit_code: None,
3304            backend_session_id: None,
3305            output_snapshot: None,
3306            metadata: None,
3307            ink: None,
3308            intervention_code: None,
3309            intervention_reason: None,
3310            intervention_at: None,
3311            last_output_at: None,
3312            idle_since: None,
3313            idle_threshold_secs: None,
3314            worktree_path: None,
3315            worktree_branch: None,
3316            git_branch: None,
3317            git_commit: None,
3318            git_files_changed: None,
3319            git_insertions: None,
3320            git_deletions: None,
3321            git_ahead: None,
3322            runtime: Runtime::Tmux,
3323            created_at: Utc::now(),
3324            updated_at: Utc::now(),
3325        }];
3326        let output = format_sessions(&sessions);
3327        assert!(
3328            !output.contains("[PR]"),
3329            "should not show PR indicator: {output}"
3330        );
3331    }
3332
3333    #[test]
3334    fn test_format_nodes() {
3335        use pulpo_common::node::NodeInfo;
3336        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3337
3338        let resp = PeersResponse {
3339            local: NodeInfo {
3340                name: "mac-mini".into(),
3341                hostname: "h".into(),
3342                os: "macos".into(),
3343                arch: "arm64".into(),
3344                cpus: 8,
3345                memory_mb: 16384,
3346                gpu: None,
3347            },
3348            peers: vec![PeerInfo {
3349                name: "win-pc".into(),
3350                address: "win-pc:7433".into(),
3351                status: PeerStatus::Online,
3352                node_info: None,
3353                session_count: Some(3),
3354                source: PeerSource::Configured,
3355            }],
3356        };
3357        let output = format_nodes(&resp);
3358        assert!(output.contains("mac-mini"));
3359        assert!(output.contains("(local)"));
3360        assert!(output.contains("win-pc"));
3361        assert!(output.contains('3'));
3362    }
3363
3364    #[test]
3365    fn test_format_nodes_no_session_count() {
3366        use pulpo_common::node::NodeInfo;
3367        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3368
3369        let resp = PeersResponse {
3370            local: NodeInfo {
3371                name: "local".into(),
3372                hostname: "h".into(),
3373                os: "linux".into(),
3374                arch: "x86_64".into(),
3375                cpus: 4,
3376                memory_mb: 8192,
3377                gpu: None,
3378            },
3379            peers: vec![PeerInfo {
3380                name: "peer".into(),
3381                address: "peer:7433".into(),
3382                status: PeerStatus::Offline,
3383                node_info: None,
3384                session_count: None,
3385                source: PeerSource::Configured,
3386            }],
3387        };
3388        let output = format_nodes(&resp);
3389        assert!(output.contains("offline"));
3390        // No session count → shows "-"
3391        let lines: Vec<&str> = output.lines().collect();
3392        assert!(lines[2].contains('-'));
3393    }
3394
3395    #[tokio::test]
3396    async fn test_execute_resume_connection_refused() {
3397        let cli = Cli {
3398            node: "localhost:1".into(),
3399            token: None,
3400            command: Some(Commands::Resume {
3401                name: "test".into(),
3402            }),
3403            path: None,
3404        };
3405        let result = execute(&cli).await;
3406        let err = result.unwrap_err().to_string();
3407        assert!(err.contains("Could not connect to pulpod"));
3408    }
3409
3410    #[tokio::test]
3411    async fn test_execute_spawn_connection_refused() {
3412        let cli = Cli {
3413            node: "localhost:1".into(),
3414            token: None,
3415            command: Some(Commands::Spawn {
3416                name: Some("test".into()),
3417                workdir: Some("/tmp".into()),
3418                ink: None,
3419                description: None,
3420                detach: true,
3421                idle_threshold: None,
3422                auto: false,
3423                worktree: false,
3424                worktree_base: None,
3425                runtime: None,
3426                secret: vec![],
3427                command: vec!["test".into()],
3428            }),
3429            path: None,
3430        };
3431        let result = execute(&cli).await;
3432        let err = result.unwrap_err().to_string();
3433        assert!(err.contains("Could not connect to pulpod"));
3434    }
3435
3436    #[tokio::test]
3437    async fn test_execute_stop_connection_refused() {
3438        let cli = Cli {
3439            node: "localhost:1".into(),
3440            token: None,
3441            command: Some(Commands::Stop {
3442                names: vec!["test".into()],
3443                purge: false,
3444            }),
3445            path: None,
3446        };
3447        let result = execute(&cli).await;
3448        let err = result.unwrap_err().to_string();
3449        assert!(err.contains("Could not connect to pulpod"));
3450    }
3451
3452    #[tokio::test]
3453    async fn test_execute_logs_connection_refused() {
3454        let cli = Cli {
3455            node: "localhost:1".into(),
3456            token: None,
3457            command: Some(Commands::Logs {
3458                name: "test".into(),
3459                lines: 50,
3460                follow: false,
3461            }),
3462            path: None,
3463        };
3464        let result = execute(&cli).await;
3465        let err = result.unwrap_err().to_string();
3466        assert!(err.contains("Could not connect to pulpod"));
3467    }
3468
3469    #[tokio::test]
3470    async fn test_friendly_error_connect() {
3471        // Make a request to a closed port to get a connect error
3472        let err = reqwest::Client::new()
3473            .get("http://127.0.0.1:1")
3474            .send()
3475            .await
3476            .unwrap_err();
3477        let friendly = friendly_error(&err, "test-node:1");
3478        let msg = friendly.to_string();
3479        assert!(
3480            msg.contains("Could not connect"),
3481            "Expected connect message, got: {msg}"
3482        );
3483    }
3484
3485    #[tokio::test]
3486    async fn test_friendly_error_other() {
3487        // A request to an invalid URL creates a builder error, not a connect error
3488        let err = reqwest::Client::new()
3489            .get("http://[::invalid::url")
3490            .send()
3491            .await
3492            .unwrap_err();
3493        let friendly = friendly_error(&err, "bad-host");
3494        let msg = friendly.to_string();
3495        assert!(
3496            msg.contains("Network error"),
3497            "Expected network error message, got: {msg}"
3498        );
3499        assert!(msg.contains("bad-host"));
3500    }
3501
3502    // -- Auth helper tests --
3503
3504    #[test]
3505    fn test_is_localhost_variants() {
3506        assert!(is_localhost("localhost:7433"));
3507        assert!(is_localhost("127.0.0.1:7433"));
3508        assert!(is_localhost("[::1]:7433"));
3509        assert!(is_localhost("::1"));
3510        assert!(is_localhost("localhost"));
3511        assert!(!is_localhost("mac-mini:7433"));
3512        assert!(!is_localhost("192.168.1.100:7433"));
3513    }
3514
3515    #[test]
3516    fn test_authed_get_with_token() {
3517        let client = reqwest::Client::new();
3518        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3519            .build()
3520            .unwrap();
3521        let auth = req
3522            .headers()
3523            .get("authorization")
3524            .unwrap()
3525            .to_str()
3526            .unwrap();
3527        assert_eq!(auth, "Bearer tok");
3528    }
3529
3530    #[test]
3531    fn test_authed_get_without_token() {
3532        let client = reqwest::Client::new();
3533        let req = authed_get(&client, "http://h:1/api".into(), None)
3534            .build()
3535            .unwrap();
3536        assert!(req.headers().get("authorization").is_none());
3537    }
3538
3539    #[test]
3540    fn test_authed_post_with_token() {
3541        let client = reqwest::Client::new();
3542        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3543            .build()
3544            .unwrap();
3545        let auth = req
3546            .headers()
3547            .get("authorization")
3548            .unwrap()
3549            .to_str()
3550            .unwrap();
3551        assert_eq!(auth, "Bearer secret");
3552    }
3553
3554    #[test]
3555    fn test_authed_post_without_token() {
3556        let client = reqwest::Client::new();
3557        let req = authed_post(&client, "http://h:1/api".into(), None)
3558            .build()
3559            .unwrap();
3560        assert!(req.headers().get("authorization").is_none());
3561    }
3562
3563    #[test]
3564    fn test_authed_delete_with_token() {
3565        let client = reqwest::Client::new();
3566        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3567            .build()
3568            .unwrap();
3569        let auth = req
3570            .headers()
3571            .get("authorization")
3572            .unwrap()
3573            .to_str()
3574            .unwrap();
3575        assert_eq!(auth, "Bearer del-tok");
3576    }
3577
3578    #[test]
3579    fn test_authed_delete_without_token() {
3580        let client = reqwest::Client::new();
3581        let req = authed_delete(&client, "http://h:1/api".into(), None)
3582            .build()
3583            .unwrap();
3584        assert!(req.headers().get("authorization").is_none());
3585    }
3586
3587    #[tokio::test]
3588    async fn test_resolve_token_explicit() {
3589        let client = reqwest::Client::new();
3590        let token =
3591            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3592        assert_eq!(token, Some("my-tok".into()));
3593    }
3594
3595    #[tokio::test]
3596    async fn test_resolve_token_remote_no_explicit() {
3597        let client = reqwest::Client::new();
3598        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3599        assert_eq!(token, None);
3600    }
3601
3602    #[tokio::test]
3603    async fn test_resolve_token_localhost_auto_discover() {
3604        use axum::{Json, Router, routing::get};
3605
3606        let app = Router::new().route(
3607            "/api/v1/auth/token",
3608            get(|| async {
3609                Json(AuthTokenResponse {
3610                    token: "discovered".into(),
3611                })
3612            }),
3613        );
3614        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3615        let addr = listener.local_addr().unwrap();
3616        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3617
3618        let node = format!("localhost:{}", addr.port());
3619        let base = base_url(&node);
3620        let client = reqwest::Client::new();
3621        let token = resolve_token(&client, &base, &node, None).await;
3622        assert_eq!(token, Some("discovered".into()));
3623    }
3624
3625    #[tokio::test]
3626    async fn test_discover_token_empty_returns_none() {
3627        use axum::{Json, Router, routing::get};
3628
3629        let app = Router::new().route(
3630            "/api/v1/auth/token",
3631            get(|| async {
3632                Json(AuthTokenResponse {
3633                    token: String::new(),
3634                })
3635            }),
3636        );
3637        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3638        let addr = listener.local_addr().unwrap();
3639        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3640
3641        let base = format!("http://127.0.0.1:{}", addr.port());
3642        let client = reqwest::Client::new();
3643        assert_eq!(discover_token(&client, &base).await, None);
3644    }
3645
3646    #[tokio::test]
3647    async fn test_discover_token_unreachable_returns_none() {
3648        let client = reqwest::Client::new();
3649        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3650    }
3651
3652    #[test]
3653    fn test_cli_parse_with_token() {
3654        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3655        assert_eq!(cli.token, Some("my-secret".into()));
3656    }
3657
3658    #[test]
3659    fn test_cli_parse_without_token() {
3660        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3661        assert_eq!(cli.token, None);
3662    }
3663
3664    #[tokio::test]
3665    async fn test_execute_with_explicit_token_sends_header() {
3666        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3667
3668        let app = Router::new().route(
3669            "/api/v1/sessions",
3670            get(|req: Request| async move {
3671                let auth = req
3672                    .headers()
3673                    .get("authorization")
3674                    .and_then(|v| v.to_str().ok())
3675                    .unwrap_or("");
3676                assert_eq!(auth, "Bearer test-token");
3677                (StatusCode::OK, "[]".to_owned())
3678            }),
3679        );
3680        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681        let addr = listener.local_addr().unwrap();
3682        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3683        let node = format!("127.0.0.1:{}", addr.port());
3684
3685        let cli = Cli {
3686            node,
3687            token: Some("test-token".into()),
3688            command: Some(Commands::List { all: false }),
3689            path: None,
3690        };
3691        let result = execute(&cli).await.unwrap();
3692        assert_eq!(result, "No sessions.");
3693    }
3694
3695    // -- Interventions tests --
3696
3697    #[test]
3698    fn test_cli_parse_interventions() {
3699        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3700        assert!(matches!(
3701            &cli.command,
3702            Some(Commands::Interventions { name }) if name == "my-session"
3703        ));
3704    }
3705
3706    #[test]
3707    fn test_format_interventions_empty() {
3708        assert_eq!(format_interventions(&[]), "No intervention events.");
3709    }
3710
3711    #[test]
3712    fn test_format_interventions_with_data() {
3713        let events = vec![
3714            InterventionEventResponse {
3715                id: 1,
3716                session_id: "sess-1".into(),
3717                code: None,
3718                reason: "Memory exceeded threshold".into(),
3719                created_at: "2026-01-01T00:00:00Z".into(),
3720            },
3721            InterventionEventResponse {
3722                id: 2,
3723                session_id: "sess-1".into(),
3724                code: None,
3725                reason: "Idle for 10 minutes".into(),
3726                created_at: "2026-01-02T00:00:00Z".into(),
3727            },
3728        ];
3729        let output = format_interventions(&events);
3730        assert!(output.contains("ID"));
3731        assert!(output.contains("TIMESTAMP"));
3732        assert!(output.contains("REASON"));
3733        assert!(output.contains("Memory exceeded threshold"));
3734        assert!(output.contains("Idle for 10 minutes"));
3735        assert!(output.contains("2026-01-01T00:00:00Z"));
3736    }
3737
3738    #[tokio::test]
3739    async fn test_execute_interventions_empty() {
3740        let node = start_test_server().await;
3741        let cli = Cli {
3742            node,
3743            token: None,
3744            command: Some(Commands::Interventions {
3745                name: "my-session".into(),
3746            }),
3747            path: None,
3748        };
3749        let result = execute(&cli).await.unwrap();
3750        assert_eq!(result, "No intervention events.");
3751    }
3752
3753    #[tokio::test]
3754    async fn test_execute_interventions_with_data() {
3755        use axum::{Router, routing::get};
3756
3757        let app = Router::new().route(
3758            "/api/v1/sessions/{id}/interventions",
3759            get(|| async {
3760                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3761                    .to_owned()
3762            }),
3763        );
3764        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3765        let addr = listener.local_addr().unwrap();
3766        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3767        let node = format!("127.0.0.1:{}", addr.port());
3768
3769        let cli = Cli {
3770            node,
3771            token: None,
3772            command: Some(Commands::Interventions {
3773                name: "test".into(),
3774            }),
3775            path: None,
3776        };
3777        let result = execute(&cli).await.unwrap();
3778        assert!(result.contains("OOM"));
3779        assert!(result.contains("2026-01-01T00:00:00Z"));
3780    }
3781
3782    #[tokio::test]
3783    async fn test_execute_interventions_connection_refused() {
3784        let cli = Cli {
3785            node: "localhost:1".into(),
3786            token: None,
3787            command: Some(Commands::Interventions {
3788                name: "test".into(),
3789            }),
3790            path: None,
3791        };
3792        let result = execute(&cli).await;
3793        let err = result.unwrap_err().to_string();
3794        assert!(err.contains("Could not connect to pulpod"));
3795    }
3796
3797    // -- Attach command tests --
3798
3799    #[test]
3800    fn test_build_attach_command_tmux() {
3801        let cmd = build_attach_command("my-session");
3802        assert_eq!(cmd.get_program(), "tmux");
3803        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3804        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3805    }
3806
3807    #[test]
3808    fn test_build_attach_command_docker() {
3809        let cmd = build_attach_command("docker:pulpo-my-task");
3810        assert_eq!(cmd.get_program(), "docker");
3811        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3812        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3813    }
3814
3815    #[test]
3816    fn test_cli_parse_attach() {
3817        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3818        assert!(matches!(
3819            &cli.command,
3820            Some(Commands::Attach { name }) if name == "my-session"
3821        ));
3822    }
3823
3824    #[test]
3825    fn test_cli_parse_attach_alias() {
3826        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3827        assert!(matches!(
3828            &cli.command,
3829            Some(Commands::Attach { name }) if name == "my-session"
3830        ));
3831    }
3832
3833    #[tokio::test]
3834    async fn test_execute_attach_success() {
3835        let node = start_test_server().await;
3836        let cli = Cli {
3837            node,
3838            token: None,
3839            command: Some(Commands::Attach {
3840                name: "test-session".into(),
3841            }),
3842            path: None,
3843        };
3844        let result = execute(&cli).await.unwrap();
3845        assert!(result.contains("Detached from session test-session"));
3846    }
3847
3848    #[tokio::test]
3849    async fn test_execute_attach_with_backend_session_id() {
3850        use axum::{Router, routing::get};
3851        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3852        let app = Router::new().route(
3853            "/api/v1/sessions/{id}",
3854            get(move || async move { session_json.to_owned() }),
3855        );
3856        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3857        let addr = listener.local_addr().unwrap();
3858        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3859
3860        let cli = Cli {
3861            node: format!("127.0.0.1:{}", addr.port()),
3862            token: None,
3863            command: Some(Commands::Attach {
3864                name: "my-session".into(),
3865            }),
3866            path: None,
3867        };
3868        let result = execute(&cli).await.unwrap();
3869        assert!(result.contains("Detached from session my-session"));
3870    }
3871
3872    #[tokio::test]
3873    async fn test_execute_attach_connection_refused() {
3874        let cli = Cli {
3875            node: "localhost:1".into(),
3876            token: None,
3877            command: Some(Commands::Attach {
3878                name: "test-session".into(),
3879            }),
3880            path: None,
3881        };
3882        let result = execute(&cli).await;
3883        let err = result.unwrap_err().to_string();
3884        assert!(err.contains("Could not connect to pulpod"));
3885    }
3886
3887    #[tokio::test]
3888    async fn test_execute_attach_error_response() {
3889        use axum::{Router, http::StatusCode, routing::get};
3890        let app = Router::new().route(
3891            "/api/v1/sessions/{id}",
3892            get(|| async {
3893                (
3894                    StatusCode::NOT_FOUND,
3895                    r#"{"error":"session not found"}"#.to_owned(),
3896                )
3897            }),
3898        );
3899        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3900        let addr = listener.local_addr().unwrap();
3901        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3902
3903        let cli = Cli {
3904            node: format!("127.0.0.1:{}", addr.port()),
3905            token: None,
3906            command: Some(Commands::Attach {
3907                name: "nonexistent".into(),
3908            }),
3909            path: None,
3910        };
3911        let result = execute(&cli).await;
3912        let err = result.unwrap_err().to_string();
3913        assert!(err.contains("session not found"));
3914    }
3915
3916    #[tokio::test]
3917    async fn test_execute_attach_stale_session() {
3918        use axum::{Router, routing::get};
3919        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3920        let app = Router::new().route(
3921            "/api/v1/sessions/{id}",
3922            get(move || async move { session_json.to_owned() }),
3923        );
3924        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3925        let addr = listener.local_addr().unwrap();
3926        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3927
3928        let cli = Cli {
3929            node: format!("127.0.0.1:{}", addr.port()),
3930            token: None,
3931            command: Some(Commands::Attach {
3932                name: "stale-sess".into(),
3933            }),
3934            path: None,
3935        };
3936        let result = execute(&cli).await;
3937        let err = result.unwrap_err().to_string();
3938        assert!(err.contains("lost"));
3939        assert!(err.contains("pulpo resume"));
3940    }
3941
3942    #[tokio::test]
3943    async fn test_execute_attach_dead_session() {
3944        use axum::{Router, routing::get};
3945        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3946        let app = Router::new().route(
3947            "/api/v1/sessions/{id}",
3948            get(move || async move { session_json.to_owned() }),
3949        );
3950        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3951        let addr = listener.local_addr().unwrap();
3952        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3953
3954        let cli = Cli {
3955            node: format!("127.0.0.1:{}", addr.port()),
3956            token: None,
3957            command: Some(Commands::Attach {
3958                name: "dead-sess".into(),
3959            }),
3960            path: None,
3961        };
3962        let result = execute(&cli).await;
3963        let err = result.unwrap_err().to_string();
3964        assert!(err.contains("stopped"));
3965        assert!(err.contains("cannot attach"));
3966    }
3967
3968    // -- Alias parse tests --
3969
3970    #[test]
3971    fn test_cli_parse_alias_spawn() {
3972        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3973        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3974    }
3975
3976    #[test]
3977    fn test_cli_parse_alias_list() {
3978        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3979        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3980    }
3981
3982    #[test]
3983    fn test_cli_parse_list_all() {
3984        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3985        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3986
3987        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3988        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3989    }
3990
3991    #[test]
3992    fn test_cli_parse_alias_logs() {
3993        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3994        assert!(matches!(
3995            &cli.command,
3996            Some(Commands::Logs { name, .. }) if name == "my-session"
3997        ));
3998    }
3999
4000    #[test]
4001    fn test_cli_parse_alias_stop() {
4002        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
4003        assert!(matches!(
4004            &cli.command,
4005            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
4006        ));
4007    }
4008
4009    #[test]
4010    fn test_cli_parse_alias_resume() {
4011        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
4012        assert!(matches!(
4013            &cli.command,
4014            Some(Commands::Resume { name }) if name == "my-session"
4015        ));
4016    }
4017
4018    #[test]
4019    fn test_cli_parse_alias_nodes() {
4020        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
4021        assert!(matches!(&cli.command, Some(Commands::Nodes)));
4022    }
4023
4024    #[test]
4025    fn test_cli_parse_alias_interventions() {
4026        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
4027        assert!(matches!(
4028            &cli.command,
4029            Some(Commands::Interventions { name }) if name == "my-session"
4030        ));
4031    }
4032
4033    #[test]
4034    fn test_api_error_json() {
4035        let err = api_error("{\"error\":\"session not found: foo\"}");
4036        assert_eq!(err.to_string(), "session not found: foo");
4037    }
4038
4039    #[test]
4040    fn test_api_error_plain_text() {
4041        let err = api_error("plain text error");
4042        assert_eq!(err.to_string(), "plain text error");
4043    }
4044
4045    // -- diff_output tests --
4046
4047    #[test]
4048    fn test_diff_output_empty_prev() {
4049        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
4050    }
4051
4052    #[test]
4053    fn test_diff_output_identical() {
4054        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
4055    }
4056
4057    #[test]
4058    fn test_diff_output_new_lines_appended() {
4059        let prev = "line1\nline2";
4060        let new = "line1\nline2\nline3\nline4";
4061        assert_eq!(diff_output(prev, new), "line3\nline4");
4062    }
4063
4064    #[test]
4065    fn test_diff_output_scrolled_window() {
4066        // Window of 3 lines: old lines scroll off top, new appear at bottom
4067        let prev = "line1\nline2\nline3";
4068        let new = "line2\nline3\nline4";
4069        assert_eq!(diff_output(prev, new), "line4");
4070    }
4071
4072    #[test]
4073    fn test_diff_output_completely_different() {
4074        let prev = "aaa\nbbb";
4075        let new = "xxx\nyyy";
4076        assert_eq!(diff_output(prev, new), "xxx\nyyy");
4077    }
4078
4079    #[test]
4080    fn test_diff_output_last_line_matches_but_overlap_fails() {
4081        // Last line of prev appears in new but preceding lines don't match
4082        let prev = "aaa\ncommon";
4083        let new = "zzz\ncommon\nnew_line";
4084        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
4085        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
4086        // Falls through, no verified overlap, so returns everything
4087        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
4088    }
4089
4090    #[test]
4091    fn test_diff_output_new_empty() {
4092        assert_eq!(diff_output("line1", ""), "");
4093    }
4094
4095    // -- follow_logs tests --
4096
4097    /// Start a test server that simulates evolving output and session status transitions.
4098    /// Start a test server that simulates evolving output with agent exit marker.
4099    async fn start_follow_test_server() -> String {
4100        use axum::{Router, extract::Path, extract::Query, routing::get};
4101        use std::sync::Arc;
4102        use std::sync::atomic::{AtomicUsize, Ordering};
4103
4104        let call_count = Arc::new(AtomicUsize::new(0));
4105        let output_count = call_count.clone();
4106
4107        let app = Router::new()
4108            .route(
4109                "/api/v1/sessions/{id}/output",
4110                get(
4111                    move |_path: Path<String>,
4112                          _query: Query<std::collections::HashMap<String, String>>| {
4113                        let count = output_count.clone();
4114                        async move {
4115                            let n = count.fetch_add(1, Ordering::SeqCst);
4116                            let output = match n {
4117                                0 => "line1\nline2".to_owned(),
4118                                1 => "line1\nline2\nline3".to_owned(),
4119                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
4120                            };
4121                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
4122                        }
4123                    },
4124                ),
4125            )
4126            .route(
4127                "/api/v1/sessions/{id}",
4128                get(|_path: Path<String>| async {
4129                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4130                }),
4131            );
4132
4133        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4134        let addr = listener.local_addr().unwrap();
4135        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4136        format!("http://127.0.0.1:{}", addr.port())
4137    }
4138
4139    #[tokio::test]
4140    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
4141        let base = start_follow_test_server().await;
4142        let client = reqwest::Client::new();
4143        let mut buf = Vec::new();
4144
4145        follow_logs(&client, &base, "test", 100, None, &mut buf)
4146            .await
4147            .unwrap();
4148
4149        let output = String::from_utf8(buf).unwrap();
4150        // Should contain initial output + new lines + agent exit marker
4151        assert!(output.contains("line1"));
4152        assert!(output.contains("line2"));
4153        assert!(output.contains("line3"));
4154        assert!(output.contains("line4"));
4155        assert!(output.contains("[pulpo] Agent exited"));
4156    }
4157
4158    #[tokio::test]
4159    async fn test_execute_logs_follow_success() {
4160        let base = start_follow_test_server().await;
4161        // Extract host:port from http://127.0.0.1:PORT
4162        let node = base.strip_prefix("http://").unwrap().to_owned();
4163
4164        let cli = Cli {
4165            node,
4166            token: None,
4167            command: Some(Commands::Logs {
4168                name: "test".into(),
4169                lines: 100,
4170                follow: true,
4171            }),
4172            path: None,
4173        };
4174        // execute() with follow writes to stdout and returns empty string
4175        let result = execute(&cli).await.unwrap();
4176        assert_eq!(result, "");
4177    }
4178
4179    #[tokio::test]
4180    async fn test_execute_logs_follow_connection_refused() {
4181        let cli = Cli {
4182            node: "localhost:1".into(),
4183            token: None,
4184            command: Some(Commands::Logs {
4185                name: "test".into(),
4186                lines: 50,
4187                follow: true,
4188            }),
4189            path: None,
4190        };
4191        let result = execute(&cli).await;
4192        let err = result.unwrap_err().to_string();
4193        assert!(
4194            err.contains("Could not connect to pulpod"),
4195            "Expected friendly error, got: {err}"
4196        );
4197    }
4198
4199    #[tokio::test]
4200    async fn test_follow_logs_exits_on_dead() {
4201        use axum::{Router, extract::Path, extract::Query, routing::get};
4202
4203        let app = Router::new()
4204            .route(
4205                "/api/v1/sessions/{id}/output",
4206                get(
4207                    |_path: Path<String>,
4208                     _query: Query<std::collections::HashMap<String, String>>| async {
4209                        r#"{"output":"some output"}"#.to_owned()
4210                    },
4211                ),
4212            )
4213            .route(
4214                "/api/v1/sessions/{id}",
4215                get(|_path: Path<String>| async {
4216                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4217                }),
4218            );
4219
4220        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4221        let addr = listener.local_addr().unwrap();
4222        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4223        let base = format!("http://127.0.0.1:{}", addr.port());
4224
4225        let client = reqwest::Client::new();
4226        let mut buf = Vec::new();
4227        follow_logs(&client, &base, "test", 100, None, &mut buf)
4228            .await
4229            .unwrap();
4230
4231        let output = String::from_utf8(buf).unwrap();
4232        assert!(output.contains("some output"));
4233    }
4234
4235    #[tokio::test]
4236    async fn test_follow_logs_exits_on_stale() {
4237        use axum::{Router, extract::Path, extract::Query, routing::get};
4238
4239        let app = Router::new()
4240            .route(
4241                "/api/v1/sessions/{id}/output",
4242                get(
4243                    |_path: Path<String>,
4244                     _query: Query<std::collections::HashMap<String, String>>| async {
4245                        r#"{"output":"stale output"}"#.to_owned()
4246                    },
4247                ),
4248            )
4249            .route(
4250                "/api/v1/sessions/{id}",
4251                get(|_path: Path<String>| async {
4252                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4253                }),
4254            );
4255
4256        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4257        let addr = listener.local_addr().unwrap();
4258        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4259        let base = format!("http://127.0.0.1:{}", addr.port());
4260
4261        let client = reqwest::Client::new();
4262        let mut buf = Vec::new();
4263        follow_logs(&client, &base, "test", 100, None, &mut buf)
4264            .await
4265            .unwrap();
4266
4267        let output = String::from_utf8(buf).unwrap();
4268        assert!(output.contains("stale output"));
4269    }
4270
4271    #[tokio::test]
4272    async fn test_execute_logs_follow_non_reqwest_error() {
4273        use axum::{Router, extract::Path, extract::Query, routing::get};
4274
4275        // Session status endpoint returns invalid JSON to trigger a serde error
4276        let app = Router::new()
4277            .route(
4278                "/api/v1/sessions/{id}/output",
4279                get(
4280                    |_path: Path<String>,
4281                     _query: Query<std::collections::HashMap<String, String>>| async {
4282                        r#"{"output":"initial"}"#.to_owned()
4283                    },
4284                ),
4285            )
4286            .route(
4287                "/api/v1/sessions/{id}",
4288                get(|_path: Path<String>| async { "not valid json".to_owned() }),
4289            );
4290
4291        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4292        let addr = listener.local_addr().unwrap();
4293        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4294        let node = format!("127.0.0.1:{}", addr.port());
4295
4296        let cli = Cli {
4297            node,
4298            token: None,
4299            command: Some(Commands::Logs {
4300                name: "test".into(),
4301                lines: 100,
4302                follow: true,
4303            }),
4304            path: None,
4305        };
4306        let err = execute(&cli).await.unwrap_err();
4307        // serde_json error, not a reqwest error — hits the Err(other) branch
4308        let msg = err.to_string();
4309        assert!(
4310            msg.contains("expected ident"),
4311            "Expected serde parse error, got: {msg}"
4312        );
4313    }
4314
4315    #[tokio::test]
4316    async fn test_fetch_session_status_connection_error() {
4317        let client = reqwest::Client::new();
4318        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
4319        assert!(result.is_err());
4320    }
4321
4322    // -- Schedule tests --
4323
4324    #[test]
4325    fn test_format_schedules_empty() {
4326        assert_eq!(format_schedules(&[]), "No schedules.");
4327    }
4328
4329    #[test]
4330    fn test_format_schedules_with_entries() {
4331        let schedules = vec![serde_json::json!({
4332            "name": "nightly",
4333            "cron": "0 3 * * *",
4334            "enabled": true,
4335            "last_run_at": null,
4336            "target_node": null
4337        })];
4338        let output = format_schedules(&schedules);
4339        assert!(output.contains("nightly"));
4340        assert!(output.contains("0 3 * * *"));
4341        assert!(output.contains("local"));
4342        assert!(output.contains("yes"));
4343        assert!(output.contains('-'));
4344    }
4345
4346    #[test]
4347    fn test_format_schedules_disabled_entry() {
4348        let schedules = vec![serde_json::json!({
4349            "name": "weekly",
4350            "cron": "0 0 * * 0",
4351            "enabled": false,
4352            "last_run_at": "2026-03-18T03:00:00Z",
4353            "target_node": "gpu-box"
4354        })];
4355        let output = format_schedules(&schedules);
4356        assert!(output.contains("weekly"));
4357        assert!(output.contains("no"));
4358        assert!(output.contains("gpu-box"));
4359        assert!(output.contains("2026-03-18T03:00"));
4360    }
4361
4362    #[test]
4363    fn test_format_schedules_header() {
4364        let schedules = vec![serde_json::json!({
4365            "name": "test",
4366            "cron": "* * * * *",
4367            "enabled": true,
4368            "last_run_at": null,
4369            "target_node": null
4370        })];
4371        let output = format_schedules(&schedules);
4372        assert!(output.contains("NAME"));
4373        assert!(output.contains("CRON"));
4374        assert!(output.contains("ENABLED"));
4375        assert!(output.contains("LAST RUN"));
4376        assert!(output.contains("NODE"));
4377    }
4378
4379    // -- Schedule CLI parse tests --
4380
4381    #[test]
4382    fn test_cli_parse_schedule_add() {
4383        let cli = Cli::try_parse_from([
4384            "pulpo",
4385            "schedule",
4386            "add",
4387            "nightly",
4388            "0 3 * * *",
4389            "--workdir",
4390            "/repo",
4391            "--",
4392            "claude",
4393            "-p",
4394            "review",
4395        ])
4396        .unwrap();
4397        assert!(matches!(
4398            &cli.command,
4399            Some(Commands::Schedule {
4400                action: ScheduleAction::Add { name, cron, .. }
4401            }) if name == "nightly" && cron == "0 3 * * *"
4402        ));
4403    }
4404
4405    #[test]
4406    fn test_cli_parse_schedule_add_with_node() {
4407        let cli = Cli::try_parse_from([
4408            "pulpo",
4409            "schedule",
4410            "add",
4411            "nightly",
4412            "0 3 * * *",
4413            "--workdir",
4414            "/repo",
4415            "--node",
4416            "gpu-box",
4417            "--",
4418            "claude",
4419        ])
4420        .unwrap();
4421        assert!(matches!(
4422            &cli.command,
4423            Some(Commands::Schedule {
4424                action: ScheduleAction::Add { node, .. }
4425            }) if node.as_deref() == Some("gpu-box")
4426        ));
4427    }
4428
4429    #[test]
4430    fn test_cli_parse_schedule_add_install_alias() {
4431        let cli =
4432            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4433        assert!(matches!(
4434            &cli.command,
4435            Some(Commands::Schedule {
4436                action: ScheduleAction::Add { name, .. }
4437            }) if name == "nightly"
4438        ));
4439    }
4440
4441    #[test]
4442    fn test_cli_parse_schedule_list() {
4443        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4444        assert!(matches!(
4445            &cli.command,
4446            Some(Commands::Schedule {
4447                action: ScheduleAction::List
4448            })
4449        ));
4450    }
4451
4452    #[test]
4453    fn test_cli_parse_schedule_remove() {
4454        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4455        assert!(matches!(
4456            &cli.command,
4457            Some(Commands::Schedule {
4458                action: ScheduleAction::Remove { name }
4459            }) if name == "nightly"
4460        ));
4461    }
4462
4463    #[test]
4464    fn test_cli_parse_schedule_pause() {
4465        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4466        assert!(matches!(
4467            &cli.command,
4468            Some(Commands::Schedule {
4469                action: ScheduleAction::Pause { name }
4470            }) if name == "nightly"
4471        ));
4472    }
4473
4474    #[test]
4475    fn test_cli_parse_schedule_resume() {
4476        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4477        assert!(matches!(
4478            &cli.command,
4479            Some(Commands::Schedule {
4480                action: ScheduleAction::Resume { name }
4481            }) if name == "nightly"
4482        ));
4483    }
4484
4485    #[test]
4486    fn test_cli_parse_schedule_alias() {
4487        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4488        assert!(matches!(
4489            &cli.command,
4490            Some(Commands::Schedule {
4491                action: ScheduleAction::List
4492            })
4493        ));
4494    }
4495
4496    #[test]
4497    fn test_cli_parse_schedule_list_alias() {
4498        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4499        assert!(matches!(
4500            &cli.command,
4501            Some(Commands::Schedule {
4502                action: ScheduleAction::List
4503            })
4504        ));
4505    }
4506
4507    #[test]
4508    fn test_cli_parse_schedule_remove_alias() {
4509        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4510        assert!(matches!(
4511            &cli.command,
4512            Some(Commands::Schedule {
4513                action: ScheduleAction::Remove { name }
4514            }) if name == "nightly"
4515        ));
4516    }
4517
4518    #[tokio::test]
4519    async fn test_execute_schedule_list_via_execute() {
4520        let node = start_test_server().await;
4521        let cli = Cli {
4522            node,
4523            token: None,
4524            command: Some(Commands::Schedule {
4525                action: ScheduleAction::List,
4526            }),
4527            path: None,
4528        };
4529        let result = execute(&cli).await.unwrap();
4530        // Under coverage, execute_schedule is a stub that returns empty string
4531        #[cfg(coverage)]
4532        assert!(result.is_empty());
4533        #[cfg(not(coverage))]
4534        assert_eq!(result, "No schedules.");
4535    }
4536
4537    #[test]
4538    fn test_schedule_action_debug() {
4539        let action = ScheduleAction::List;
4540        assert_eq!(format!("{action:?}"), "List");
4541    }
4542
4543    #[test]
4544    fn test_cli_parse_send_alias() {
4545        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4546        assert!(matches!(
4547            &cli.command,
4548            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4549        ));
4550    }
4551
4552    #[test]
4553    fn test_cli_parse_spawn_no_name() {
4554        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4555        assert!(matches!(
4556            &cli.command,
4557            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4558        ));
4559    }
4560
4561    #[test]
4562    fn test_cli_parse_spawn_optional_name_with_command() {
4563        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4564        assert!(matches!(
4565            &cli.command,
4566            Some(Commands::Spawn { name, command, .. })
4567                if name.is_none() && command == &["echo", "hello"]
4568        ));
4569    }
4570
4571    #[test]
4572    fn test_cli_parse_path_shortcut() {
4573        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4574        assert!(cli.command.is_none());
4575        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4576    }
4577
4578    #[test]
4579    fn test_cli_parse_no_args() {
4580        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4581        assert!(cli.command.is_none());
4582        assert!(cli.path.is_none());
4583    }
4584
4585    #[test]
4586    fn test_derive_session_name_simple() {
4587        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4588    }
4589
4590    #[test]
4591    fn test_derive_session_name_with_special_chars() {
4592        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4593    }
4594
4595    #[test]
4596    fn test_derive_session_name_root() {
4597        assert_eq!(derive_session_name("/"), "session");
4598    }
4599
4600    #[test]
4601    fn test_derive_session_name_dots() {
4602        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4603    }
4604
4605    #[test]
4606    fn test_resolve_path_absolute() {
4607        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4608    }
4609
4610    #[test]
4611    fn test_resolve_path_relative() {
4612        let resolved = resolve_path("my-repo");
4613        assert!(resolved.ends_with("my-repo"));
4614        assert!(resolved.starts_with('/'));
4615    }
4616
4617    #[tokio::test]
4618    async fn test_execute_no_args_shows_help() {
4619        let node = start_test_server().await;
4620        let cli = Cli {
4621            node,
4622            token: None,
4623            path: None,
4624            command: None,
4625        };
4626        let result = execute(&cli).await.unwrap();
4627        assert!(
4628            result.is_empty(),
4629            "no-args should return empty string after printing help"
4630        );
4631    }
4632
4633    #[tokio::test]
4634    async fn test_execute_path_shortcut() {
4635        let node = start_test_server().await;
4636        let cli = Cli {
4637            node,
4638            token: None,
4639            path: Some("/tmp".into()),
4640            command: None,
4641        };
4642        let result = execute(&cli).await.unwrap();
4643        assert!(result.contains("Detached from session"));
4644    }
4645
4646    #[tokio::test]
4647    async fn test_deduplicate_session_name_no_conflict() {
4648        // Connection refused → falls through to "name not taken" path
4649        let base = "http://127.0.0.1:1";
4650        let client = reqwest::Client::new();
4651        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4652        assert_eq!(name, "fresh");
4653    }
4654
4655    #[tokio::test]
4656    async fn test_deduplicate_session_name_with_conflict() {
4657        use axum::{Router, routing::get};
4658        use std::sync::atomic::{AtomicU32, Ordering};
4659
4660        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4661        let counter = call_count.clone();
4662        let app = Router::new()
4663            .route(
4664                "/api/v1/sessions/{id}",
4665                get(move || {
4666                    let c = counter.clone();
4667                    async move {
4668                        let n = c.fetch_add(1, Ordering::SeqCst);
4669                        if n == 0 {
4670                            // First call (base name) → exists
4671                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4672                        } else {
4673                            // Suffixed name → not found
4674                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4675                        }
4676                    }
4677                }),
4678            )
4679            .route(
4680                "/api/v1/peers",
4681                get(|| async {
4682                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4683                }),
4684            );
4685        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4686        let addr = listener.local_addr().unwrap();
4687        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4688        let base = format!("http://127.0.0.1:{}", addr.port());
4689        let client = reqwest::Client::new();
4690        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4691        assert_eq!(name, "repo-2");
4692    }
4693
4694    // -- Node resolution tests --
4695
4696    #[test]
4697    fn test_node_needs_resolution() {
4698        assert!(!node_needs_resolution("localhost:7433"));
4699        assert!(!node_needs_resolution("mac-mini:7433"));
4700        assert!(!node_needs_resolution("10.0.0.1:7433"));
4701        assert!(!node_needs_resolution("[::1]:7433"));
4702        assert!(node_needs_resolution("mac-mini"));
4703        assert!(node_needs_resolution("linux-server"));
4704        assert!(node_needs_resolution("localhost"));
4705    }
4706
4707    #[tokio::test]
4708    async fn test_resolve_node_with_port() {
4709        let client = reqwest::Client::new();
4710        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4711        assert_eq!(addr, "mac-mini:7433");
4712        assert!(token.is_none());
4713    }
4714
4715    #[tokio::test]
4716    async fn test_resolve_node_fallback_appends_port() {
4717        // No local daemon running on localhost:7433, so peer lookup fails
4718        // and it falls back to appending :7433
4719        let client = reqwest::Client::new();
4720        let (addr, token) = resolve_node(&client, "unknown-host").await;
4721        assert_eq!(addr, "unknown-host:7433");
4722        assert!(token.is_none());
4723    }
4724
4725    #[cfg(not(coverage))]
4726    #[tokio::test]
4727    async fn test_resolve_node_finds_peer() {
4728        use axum::{Router, routing::get};
4729
4730        let app = Router::new()
4731            .route(
4732                "/api/v1/peers",
4733                get(|| async {
4734                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4735                }),
4736            )
4737            .route(
4738                "/api/v1/config",
4739                get(|| async {
4740                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4741                }),
4742            );
4743
4744        // Port 7433 may be in use; skip test if so
4745        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4746            return;
4747        };
4748        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4749
4750        let client = reqwest::Client::new();
4751        let (addr, token) = resolve_node(&client, "mac-mini").await;
4752        assert_eq!(addr, "10.0.0.5:7433");
4753        assert_eq!(token, Some("peer-secret".into()));
4754    }
4755
4756    #[cfg(not(coverage))]
4757    #[tokio::test]
4758    async fn test_resolve_node_peer_no_token() {
4759        use axum::{Router, routing::get};
4760
4761        let app = Router::new()
4762            .route(
4763                "/api/v1/peers",
4764                get(|| async {
4765                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4766                }),
4767            )
4768            .route(
4769                "/api/v1/config",
4770                get(|| async {
4771                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4772                }),
4773            );
4774
4775        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4776            return; // Port in use, skip
4777        };
4778        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4779
4780        let client = reqwest::Client::new();
4781        let (addr, token) = resolve_node(&client, "test-peer").await;
4782        assert_eq!(addr, "10.0.0.9:7433");
4783        assert!(token.is_none()); // Simple peer entry has no token
4784    }
4785
4786    #[tokio::test]
4787    async fn test_execute_with_peer_name_resolution() {
4788        // When node doesn't contain ':', resolve_node is called.
4789        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4790        // The connection to the fallback address will fail, giving us a connection error.
4791        let cli = Cli {
4792            node: "nonexistent-peer".into(),
4793            token: None,
4794            command: Some(Commands::List { all: false }),
4795            path: None,
4796        };
4797        let result = execute(&cli).await;
4798        // Should try to connect to nonexistent-peer:7433 and fail
4799        assert!(result.is_err());
4800    }
4801
4802    // -- Auto node selection tests --
4803
4804    #[test]
4805    fn test_cli_parse_spawn_auto() {
4806        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4807        assert!(matches!(
4808            &cli.command,
4809            Some(Commands::Spawn { auto, .. }) if *auto
4810        ));
4811    }
4812
4813    #[test]
4814    fn test_cli_parse_spawn_auto_default() {
4815        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4816        assert!(matches!(
4817            &cli.command,
4818            Some(Commands::Spawn { auto, .. }) if !auto
4819        ));
4820    }
4821
4822    #[tokio::test]
4823    async fn test_select_best_node_coverage_stub() {
4824        // Exercise the coverage stub (or real function in non-coverage builds)
4825        let client = reqwest::Client::new();
4826        // In coverage builds, the stub returns ("localhost:7433", "local")
4827        // In non-coverage builds, this fails because no server is running — that's OK
4828        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4829    }
4830
4831    #[cfg(not(coverage))]
4832    #[tokio::test]
4833    async fn test_select_best_node_picks_least_loaded() {
4834        use axum::{Router, routing::get};
4835
4836        let app = Router::new().route(
4837            "/api/v1/peers",
4838            get(|| async {
4839                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4840            }),
4841        );
4842        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4843        let addr = listener.local_addr().unwrap();
4844        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4845        let base = format!("http://127.0.0.1:{}", addr.port());
4846
4847        let client = reqwest::Client::new();
4848        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4849        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4850        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4851        // idle wins (lower score)
4852        assert_eq!(name, "idle");
4853        assert_eq!(addr, "idle:7433");
4854    }
4855
4856    #[cfg(not(coverage))]
4857    #[tokio::test]
4858    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4859        use axum::{Router, routing::get};
4860
4861        let app = Router::new().route(
4862            "/api/v1/peers",
4863            get(|| async {
4864                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4865            }),
4866        );
4867        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4868        let addr = listener.local_addr().unwrap();
4869        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4870        let base = format!("http://127.0.0.1:{}", addr.port());
4871
4872        let client = reqwest::Client::new();
4873        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4874        assert_eq!(name, "my-mac");
4875        assert_eq!(addr, "localhost:7433");
4876    }
4877
4878    #[cfg(not(coverage))]
4879    #[tokio::test]
4880    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4881        use axum::{Router, routing::get};
4882
4883        let app = Router::new().route(
4884            "/api/v1/peers",
4885            get(|| async {
4886                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4887            }),
4888        );
4889        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4890        let addr = listener.local_addr().unwrap();
4891        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4892        let base = format!("http://127.0.0.1:{}", addr.port());
4893
4894        let client = reqwest::Client::new();
4895        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4896        assert_eq!(name, "solo");
4897        assert_eq!(addr, "localhost:7433");
4898    }
4899
4900    #[cfg(not(coverage))]
4901    #[tokio::test]
4902    async fn test_execute_spawn_auto_selects_node() {
4903        use axum::{
4904            Router,
4905            http::StatusCode,
4906            routing::{get, post},
4907        };
4908
4909        let create_json = test_create_response_json();
4910
4911        // Bind early so we know the address to embed in the peers response
4912        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4913        let addr = listener.local_addr().unwrap();
4914        let node = format!("127.0.0.1:{}", addr.port());
4915        let peer_addr = node.clone();
4916
4917        let app = Router::new()
4918            .route(
4919                "/api/v1/peers",
4920                get(move || {
4921                    let peer_addr = peer_addr.clone();
4922                    async move {
4923                        format!(
4924                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4925                        )
4926                    }
4927                }),
4928            )
4929            .route(
4930                "/api/v1/sessions",
4931                post(move || async move {
4932                    (StatusCode::CREATED, create_json.clone())
4933                }),
4934            );
4935
4936        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4937
4938        let cli = Cli {
4939            node,
4940            token: None,
4941            command: Some(Commands::Spawn {
4942                name: Some("test".into()),
4943                workdir: Some("/tmp/repo".into()),
4944                ink: None,
4945                description: None,
4946                detach: true,
4947                idle_threshold: None,
4948                auto: true,
4949                worktree: false,
4950                worktree_base: None,
4951                runtime: None,
4952                secret: vec![],
4953                command: vec!["echo".into(), "hello".into()],
4954            }),
4955            path: None,
4956        };
4957        let result = execute(&cli).await.unwrap();
4958        assert!(result.contains("Created session"));
4959    }
4960
4961    #[cfg(not(coverage))]
4962    #[tokio::test]
4963    async fn test_select_best_node_peer_no_session_count() {
4964        use axum::{Router, routing::get};
4965
4966        let app = Router::new().route(
4967            "/api/v1/peers",
4968            get(|| async {
4969                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4970            }),
4971        );
4972        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4973        let addr = listener.local_addr().unwrap();
4974        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4975        let base = format!("http://127.0.0.1:{}", addr.port());
4976
4977        let client = reqwest::Client::new();
4978        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4979        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4980        assert_eq!(name, "fresh");
4981        assert_eq!(addr, "fresh:7433");
4982    }
4983
4984    // -- Secret CLI parse tests --
4985
4986    #[test]
4987    fn test_cli_parse_secret_set() {
4988        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4989        assert!(matches!(
4990            &cli.command,
4991            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4992                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4993        ));
4994    }
4995
4996    #[test]
4997    fn test_cli_parse_secret_list() {
4998        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4999        assert!(matches!(
5000            &cli.command,
5001            Some(Commands::Secret {
5002                action: SecretAction::List
5003            })
5004        ));
5005    }
5006
5007    #[test]
5008    fn test_cli_parse_secret_list_alias() {
5009        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
5010        assert!(matches!(
5011            &cli.command,
5012            Some(Commands::Secret {
5013                action: SecretAction::List
5014            })
5015        ));
5016    }
5017
5018    #[test]
5019    fn test_cli_parse_secret_delete() {
5020        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
5021        assert!(matches!(
5022            &cli.command,
5023            Some(Commands::Secret { action: SecretAction::Delete { name } })
5024                if name == "MY_TOKEN"
5025        ));
5026    }
5027
5028    #[test]
5029    fn test_cli_parse_secret_delete_alias() {
5030        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
5031        assert!(matches!(
5032            &cli.command,
5033            Some(Commands::Secret { action: SecretAction::Delete { name } })
5034                if name == "MY_TOKEN"
5035        ));
5036    }
5037
5038    #[test]
5039    fn test_cli_parse_secret_alias() {
5040        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
5041        assert!(matches!(
5042            &cli.command,
5043            Some(Commands::Secret {
5044                action: SecretAction::List
5045            })
5046        ));
5047    }
5048
5049    #[test]
5050    fn test_format_secrets_empty() {
5051        let secrets: Vec<serde_json::Value> = vec![];
5052        assert_eq!(format_secrets(&secrets), "No secrets configured.");
5053    }
5054
5055    #[test]
5056    fn test_format_secrets_with_entries() {
5057        let secrets = vec![
5058            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5059            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5060        ];
5061        let output = format_secrets(&secrets);
5062        assert!(output.contains("GITHUB_TOKEN"));
5063        assert!(output.contains("NPM_TOKEN"));
5064        assert!(output.contains("NAME"));
5065        assert!(output.contains("ENV"));
5066        assert!(output.contains("CREATED"));
5067    }
5068
5069    #[test]
5070    fn test_format_secrets_with_env() {
5071        let secrets = vec![
5072            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5073            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5074        ];
5075        let output = format_secrets(&secrets);
5076        assert!(output.contains("GH_WORK"));
5077        assert!(output.contains("GITHUB_TOKEN"));
5078        assert!(output.contains("NPM_TOKEN"));
5079    }
5080
5081    #[test]
5082    fn test_format_secrets_short_timestamp() {
5083        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
5084        let output = format_secrets(&secrets);
5085        assert!(output.contains("now"));
5086    }
5087
5088    #[test]
5089    fn test_format_schedules_short_last_run_at() {
5090        // Regression: last_run_at shorter than 16 chars must not panic
5091        let schedules = vec![serde_json::json!({
5092            "name": "test",
5093            "cron": "* * * * *",
5094            "enabled": true,
5095            "last_run_at": "short",
5096            "target_node": null
5097        })];
5098        let output = format_schedules(&schedules);
5099        assert!(output.contains("short"));
5100    }
5101
5102    #[test]
5103    fn test_format_sessions_multibyte_command_truncation() {
5104        use chrono::Utc;
5105        use pulpo_common::session::SessionStatus;
5106        use uuid::Uuid;
5107
5108        // Command with multi-byte chars exceeding 50 bytes; must not panic
5109        let sessions = vec![Session {
5110            id: Uuid::nil(),
5111            name: "test".into(),
5112            workdir: "/tmp".into(),
5113            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
5114            description: None,
5115            status: SessionStatus::Active,
5116            exit_code: None,
5117            backend_session_id: None,
5118            output_snapshot: None,
5119            metadata: None,
5120            ink: None,
5121            intervention_code: None,
5122            intervention_reason: None,
5123            intervention_at: None,
5124            last_output_at: None,
5125            idle_since: None,
5126            idle_threshold_secs: None,
5127            worktree_path: None,
5128            worktree_branch: None,
5129            git_branch: None,
5130            git_commit: None,
5131            git_files_changed: None,
5132            git_insertions: None,
5133            git_deletions: None,
5134            git_ahead: None,
5135            runtime: Runtime::Tmux,
5136            created_at: Utc::now(),
5137            updated_at: Utc::now(),
5138        }];
5139        let output = format_sessions(&sessions);
5140        assert!(output.contains("..."));
5141    }
5142}