Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8#[cfg(test)]
9use pulpo_common::session::Runtime;
10use pulpo_common::session::Session;
11
12#[derive(Parser, Debug)]
13#[command(
14    name = "pulpo",
15    about = "Manage agent sessions across your machines",
16    version = env!("PULPO_VERSION"),
17    args_conflicts_with_subcommands = true
18)]
19pub struct Cli {
20    /// Target node (default: localhost)
21    #[arg(long, default_value = "localhost:7433")]
22    pub node: String,
23
24    /// Auth token (auto-discovered from local daemon if omitted)
25    #[arg(long)]
26    pub token: Option<String>,
27
28    #[command(subcommand)]
29    pub command: Option<Commands>,
30
31    /// Quick spawn: `pulpo <path>` spawns a session in that directory
32    #[arg(value_name = "PATH")]
33    pub path: Option<String>,
34}
35
36#[derive(Subcommand, Debug)]
37#[allow(clippy::large_enum_variant)]
38pub enum Commands {
39    /// Attach to a session's terminal
40    #[command(visible_alias = "a")]
41    Attach {
42        /// Session name or ID
43        name: String,
44    },
45
46    /// Send input to a session
47    #[command(visible_alias = "i", visible_alias = "send")]
48    Input {
49        /// Session name or ID
50        name: String,
51        /// Text to send (sends Enter if omitted)
52        text: Option<String>,
53    },
54
55    /// Spawn a new agent session
56    #[command(visible_alias = "s")]
57    Spawn {
58        /// Session name (auto-generated from workdir if omitted)
59        name: Option<String>,
60
61        /// Working directory (defaults to current directory)
62        #[arg(long)]
63        workdir: Option<String>,
64
65        /// Ink name (from config)
66        #[arg(long)]
67        ink: Option<String>,
68
69        /// Human-readable description of the task
70        #[arg(long)]
71        description: Option<String>,
72
73        /// Don't attach to the session after spawning
74        #[arg(short, long)]
75        detach: bool,
76
77        /// Idle threshold in seconds (0 = never idle)
78        #[arg(long)]
79        idle_threshold: Option<u32>,
80
81        /// Auto-select the least loaded node
82        #[arg(long)]
83        auto: bool,
84
85        /// Create an isolated git worktree for the session
86        #[arg(long)]
87        worktree: bool,
88
89        /// Base branch to fork the worktree from (implies --worktree)
90        #[arg(long = "worktree-base")]
91        worktree_base: Option<String>,
92
93        /// Runtime environment: tmux (default) or docker
94        #[arg(long)]
95        runtime: Option<String>,
96
97        /// Secrets to inject as environment variables (by name)
98        #[arg(long)]
99        secret: Vec<String>,
100
101        /// Command to run (everything after --)
102        #[arg(last = true)]
103        command: Vec<String>,
104    },
105
106    /// List sessions (live only by default)
107    #[command(visible_alias = "ls")]
108    List {
109        /// Show all sessions including stopped and lost
110        #[arg(short, long)]
111        all: bool,
112    },
113
114    /// Show session logs/output
115    #[command(visible_alias = "l")]
116    Logs {
117        /// Session name or ID
118        name: String,
119
120        /// Number of lines to fetch
121        #[arg(long, default_value = "100")]
122        lines: usize,
123
124        /// Follow output (like `tail -f`)
125        #[arg(short, long)]
126        follow: bool,
127    },
128
129    /// Stop one or more sessions
130    #[command(visible_alias = "k", alias = "kill")]
131    Stop {
132        /// Session names or IDs
133        #[arg(required = true)]
134        names: Vec<String>,
135
136        /// Also purge the session from history
137        #[arg(long, short = 'p')]
138        purge: bool,
139    },
140
141    /// Remove all stopped and lost sessions
142    Cleanup,
143
144    /// Resume a lost session
145    #[command(visible_alias = "r")]
146    Resume {
147        /// Session name or ID
148        name: String,
149    },
150
151    /// List all known nodes
152    #[command(visible_alias = "n")]
153    Nodes,
154
155    /// Show intervention history for a session
156    #[command(visible_alias = "iv")]
157    Interventions {
158        /// Session name or ID
159        name: String,
160    },
161
162    /// Open the web dashboard in your browser
163    Ui,
164
165    /// Manage scheduled agent runs
166    #[command(visible_alias = "sched")]
167    Schedule {
168        #[command(subcommand)]
169        action: ScheduleAction,
170    },
171
172    /// Manage secrets (environment variables injected into sessions)
173    #[command(visible_alias = "sec")]
174    Secret {
175        #[command(subcommand)]
176        action: SecretAction,
177    },
178
179    /// Manage git worktrees for sessions
180    #[command(visible_alias = "wt")]
181    Worktree {
182        #[command(subcommand)]
183        action: WorktreeAction,
184    },
185}
186
187#[derive(Subcommand, Debug)]
188pub enum SecretAction {
189    /// Set a secret
190    Set {
191        /// Secret name (will be the env var name, uppercase + underscores)
192        name: String,
193        /// Secret value
194        value: String,
195        /// Environment variable name (defaults to secret name)
196        #[arg(long)]
197        env: Option<String>,
198    },
199    /// List secret names
200    #[command(visible_alias = "ls")]
201    List,
202    /// Delete a secret
203    #[command(visible_alias = "rm")]
204    Delete {
205        /// Secret name
206        name: String,
207    },
208}
209
210#[derive(Subcommand, Debug)]
211pub enum WorktreeAction {
212    /// List sessions that use git worktrees
213    #[command(visible_alias = "ls")]
214    List,
215}
216
217#[derive(Subcommand, Debug)]
218pub enum ScheduleAction {
219    /// Add a new schedule
220    #[command(alias = "install")]
221    Add {
222        /// Schedule name
223        name: String,
224        /// Cron expression (e.g. "0 3 * * *")
225        cron: String,
226        /// Working directory
227        #[arg(long)]
228        workdir: Option<String>,
229        /// Target node (omit = local, "auto" = least-loaded)
230        #[arg(long)]
231        node: Option<String>,
232        /// Ink preset
233        #[arg(long)]
234        ink: Option<String>,
235        /// Description
236        #[arg(long)]
237        description: Option<String>,
238        /// Command to run (everything after --)
239        #[arg(last = true)]
240        command: Vec<String>,
241    },
242    /// List all schedules
243    #[command(alias = "ls")]
244    List,
245    /// Remove a schedule
246    #[command(alias = "rm")]
247    Remove {
248        /// Schedule name or ID
249        name: String,
250    },
251    /// Pause a schedule
252    Pause {
253        /// Schedule name or ID
254        name: String,
255    },
256    /// Resume a paused schedule
257    Resume {
258        /// Schedule name or ID
259        name: String,
260    },
261}
262
263/// The marker emitted by the agent wrapper when the agent process exits.
264const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
265
266/// Resolve a path to an absolute path string.
267fn resolve_path(path: &str) -> String {
268    let p = std::path::Path::new(path);
269    if p.is_absolute() {
270        path.to_owned()
271    } else {
272        std::env::current_dir().map_or_else(
273            |_| path.to_owned(),
274            |cwd| cwd.join(p).to_string_lossy().into_owned(),
275        )
276    }
277}
278
279/// Derive a session name from a directory path (basename, kebab-cased).
280fn derive_session_name(path: &str) -> String {
281    let basename = std::path::Path::new(path)
282        .file_name()
283        .and_then(|n| n.to_str())
284        .unwrap_or("session");
285    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
286    let kebab: String = basename
287        .chars()
288        .map(|c| {
289            if c.is_ascii_alphanumeric() {
290                c.to_ascii_lowercase()
291            } else {
292                '-'
293            }
294        })
295        .collect();
296    // Collapse consecutive hyphens and trim leading/trailing hyphens
297    let mut result = String::new();
298    for c in kebab.chars() {
299        if c == '-' && result.ends_with('-') {
300            continue;
301        }
302        result.push(c);
303    }
304    let result = result.trim_matches('-').to_owned();
305    if result.is_empty() {
306        "session".to_owned()
307    } else {
308        result
309    }
310}
311
312/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
313async fn deduplicate_session_name(
314    client: &reqwest::Client,
315    base: &str,
316    name: &str,
317    token: Option<&str>,
318) -> String {
319    // Check if the name is already taken by fetching the session
320    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
321        .send()
322        .await;
323    match resp {
324        Ok(r) if r.status().is_success() => {
325            // Session exists — try suffixed names
326            for i in 2..=99 {
327                let candidate = format!("{name}-{i}");
328                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
329                    .send()
330                    .await;
331                match resp {
332                    Ok(r) if r.status().is_success() => {}
333                    _ => return candidate,
334                }
335            }
336            format!("{name}-100")
337        }
338        _ => name.to_owned(),
339    }
340}
341
342/// Format the base URL from the node address.
343pub fn base_url(node: &str) -> String {
344    format!("http://{node}")
345}
346
347/// Response shape for the output endpoint.
348#[derive(serde::Deserialize)]
349struct OutputResponse {
350    output: String,
351}
352
353/// Format the repo column: `basename@branch +42/-7 ↑3` with diff stats and ahead count.
354/// Truncates to 30 chars if needed.
355fn format_repo(session: &Session) -> String {
356    let basename = session
357        .workdir
358        .rsplit('/')
359        .next()
360        .unwrap_or(&session.workdir)
361        .to_owned();
362    let mut display = match session.git_branch.as_deref() {
363        Some(branch) => format!("{basename}@{branch}"),
364        None => basename,
365    };
366
367    // Append diff stats when available and non-zero
368    let ins = session.git_insertions.unwrap_or(0);
369    let del = session.git_deletions.unwrap_or(0);
370    if ins > 0 || del > 0 {
371        display = format!("{display} +{ins}/-{del}");
372    }
373
374    // Append ahead count when > 0
375    if let Some(ahead) = session.git_ahead
376        && ahead > 0
377    {
378        display = format!("{display} \u{2191}{ahead}");
379    }
380
381    if display.len() <= 30 {
382        display
383    } else {
384        let truncated: String = display.chars().take(27).collect();
385        format!("{truncated}...")
386    }
387}
388
389fn format_sessions(sessions: &[Session]) -> String {
390    if sessions.is_empty() {
391        return "No sessions.".into();
392    }
393    let mut lines = vec![format!(
394        "{:<10} {:<24} {:<12} {:<32} {}",
395        "ID", "NAME", "STATUS", "REPO", "COMMAND"
396    )];
397    for s in sessions {
398        let cmd_display = if s.command.len() > 40 {
399            let truncated: String = s.command.chars().take(37).collect();
400            format!("{truncated}...")
401        } else {
402            s.command.clone()
403        };
404        let short_id = &s.id.to_string()[..8];
405        let has_pr = s
406            .metadata
407            .as_ref()
408            .is_some_and(|m| m.contains_key("pr_url"));
409        let has_error = s
410            .metadata
411            .as_ref()
412            .is_some_and(|m| m.contains_key("error_status"));
413        let mut name_display = s.name.clone();
414        if s.worktree_path.is_some() {
415            name_display = format!("{name_display} [wt]");
416        }
417        if has_pr {
418            name_display = format!("{name_display} [PR]");
419        }
420        if has_error {
421            name_display = format!("{name_display} [!]");
422        }
423        let repo_display = format_repo(s);
424        lines.push(format!(
425            "{:<10} {:<24} {:<12} {:<32} {}",
426            short_id, name_display, s.status, repo_display, cmd_display
427        ));
428    }
429    lines.join("\n")
430}
431
432/// Format the peers response as a table.
433fn format_nodes(resp: &PeersResponse) -> String {
434    let mut lines = vec![format!(
435        "{:<20} {:<25} {:<10} {}",
436        "NAME", "ADDRESS", "STATUS", "SESSIONS"
437    )];
438    lines.push(format!(
439        "{:<20} {:<25} {:<10} {}",
440        resp.local.name, "(local)", "online", "-"
441    ));
442    for p in &resp.peers {
443        let sessions = p
444            .session_count
445            .map_or_else(|| "-".into(), |c| c.to_string());
446        lines.push(format!(
447            "{:<20} {:<25} {:<10} {}",
448            p.name, p.address, p.status, sessions
449        ));
450    }
451    lines.join("\n")
452}
453
454/// Format intervention events as a table.
455fn format_interventions(events: &[InterventionEventResponse]) -> String {
456    if events.is_empty() {
457        return "No intervention events.".into();
458    }
459    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
460    for e in events {
461        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
462    }
463    lines.join("\n")
464}
465
466/// Build the command to open a URL in the default browser.
467#[cfg_attr(coverage, allow(dead_code))]
468fn build_open_command(url: &str) -> std::process::Command {
469    #[cfg(target_os = "macos")]
470    {
471        let mut cmd = std::process::Command::new("open");
472        cmd.arg(url);
473        cmd
474    }
475    #[cfg(target_os = "linux")]
476    {
477        let mut cmd = std::process::Command::new("xdg-open");
478        cmd.arg(url);
479        cmd
480    }
481    #[cfg(target_os = "windows")]
482    {
483        let mut cmd = std::process::Command::new("cmd");
484        cmd.args(["/C", "start", url]);
485        cmd
486    }
487    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
488    {
489        // Fallback: try xdg-open
490        let mut cmd = std::process::Command::new("xdg-open");
491        cmd.arg(url);
492        cmd
493    }
494}
495
496/// Open a URL in the default browser.
497#[cfg(not(coverage))]
498fn open_browser(url: &str) -> Result<()> {
499    build_open_command(url).status()?;
500    Ok(())
501}
502
503/// Stub for coverage builds — avoids opening a browser during tests.
504#[cfg(coverage)]
505fn open_browser(_url: &str) -> Result<()> {
506    Ok(())
507}
508
509/// Build the command to attach to a session's terminal.
510/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
511#[cfg_attr(coverage, allow(dead_code))]
512fn build_attach_command(backend_session_id: &str) -> std::process::Command {
513    // Docker sessions: exec into the container
514    if let Some(container) = backend_session_id.strip_prefix("docker:") {
515        let mut cmd = std::process::Command::new("docker");
516        cmd.args(["exec", "-it", container, "/bin/sh"]);
517        return cmd;
518    }
519    // tmux sessions
520    #[cfg(not(target_os = "windows"))]
521    {
522        let mut cmd = std::process::Command::new("tmux");
523        cmd.args(["attach-session", "-t", backend_session_id]);
524        cmd
525    }
526    #[cfg(target_os = "windows")]
527    {
528        // tmux attach not available on Windows — inform the user
529        let mut cmd = std::process::Command::new("cmd");
530        cmd.args([
531            "/C",
532            "echo",
533            "Attach not available on Windows. Use the web UI or --runtime docker.",
534        ]);
535        cmd
536    }
537}
538
539/// Attach to a session's terminal.
540#[cfg(not(any(test, coverage, target_os = "windows")))]
541fn attach_session(backend_session_id: &str) -> Result<()> {
542    let status = build_attach_command(backend_session_id).status()?;
543    if !status.success() {
544        anyhow::bail!("attach failed with {status}");
545    }
546    Ok(())
547}
548
549/// Stub for Windows — tmux attach is not available.
550#[cfg(all(target_os = "windows", not(test), not(coverage)))]
551fn attach_session(_backend_session_id: &str) -> Result<()> {
552    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
553    Ok(())
554}
555
556/// Stub for test and coverage builds — avoids spawning real terminals during tests.
557#[cfg(any(test, coverage))]
558#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
559fn attach_session(_backend_session_id: &str) -> Result<()> {
560    Ok(())
561}
562
563/// Extract a clean error message from an API JSON response (or fall back to raw text).
564fn api_error(text: &str) -> anyhow::Error {
565    serde_json::from_str::<serde_json::Value>(text)
566        .ok()
567        .and_then(|v| v["error"].as_str().map(String::from))
568        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
569}
570
571/// Return the response body text, or a clean error if the response was non-success.
572async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
573    if resp.status().is_success() {
574        Ok(resp.text().await?)
575    } else {
576        let text = resp.text().await?;
577        Err(api_error(&text))
578    }
579}
580
581/// Map a reqwest error to a user-friendly message.
582fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
583    if err.is_connect() {
584        anyhow::anyhow!(
585            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
586        )
587    } else {
588        anyhow::anyhow!("Network error connecting to {node}: {err}")
589    }
590}
591
592/// Check if the node address points to localhost.
593fn is_localhost(node: &str) -> bool {
594    let host = node.split(':').next().unwrap_or(node);
595    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
596}
597
598/// Try to auto-discover the auth token from a local daemon.
599async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
600    let resp = client
601        .get(format!("{base}/api/v1/auth/token"))
602        .send()
603        .await
604        .ok()?;
605    let body: AuthTokenResponse = resp.json().await.ok()?;
606    if body.token.is_empty() {
607        None
608    } else {
609        Some(body.token)
610    }
611}
612
613/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
614async fn resolve_token(
615    client: &reqwest::Client,
616    base: &str,
617    node: &str,
618    explicit: Option<&str>,
619) -> Option<String> {
620    if let Some(t) = explicit {
621        return Some(t.to_owned());
622    }
623    if is_localhost(node) {
624        return discover_token(client, base).await;
625    }
626    None
627}
628
629/// Check if a node string needs resolution (no port specified).
630fn node_needs_resolution(node: &str) -> bool {
631    !node.contains(':')
632}
633
634/// Resolve a node reference to a `host:port` address.
635///
636/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
637/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
638/// online peer is found, return its address and optionally its configured auth token
639/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
640#[cfg(not(coverage))]
641async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
642    // Already has port — use as-is
643    if !node_needs_resolution(node) {
644        return (node.to_owned(), None);
645    }
646
647    // Try to resolve via local daemon's peer registry
648    let local_base = "http://localhost:7433";
649    let mut resolved_address: Option<String> = None;
650
651    if let Ok(resp) = client
652        .get(format!("{local_base}/api/v1/peers"))
653        .send()
654        .await
655        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
656    {
657        for peer in &peers_resp.peers {
658            if peer.name == node {
659                resolved_address = Some(peer.address.clone());
660                break;
661            }
662        }
663    }
664
665    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
666
667    // Try to get the peer's auth token from the config endpoint
668    let peer_token = if let Ok(resp) = client
669        .get(format!("{local_base}/api/v1/config"))
670        .send()
671        .await
672        && let Ok(config) = resp.json::<ConfigResponse>().await
673        && let Some(entry) = config.peers.get(node)
674    {
675        entry.token().map(String::from)
676    } else {
677        None
678    };
679
680    (address, peer_token)
681}
682
683/// Coverage stub — no real HTTP resolution during coverage builds.
684#[cfg(coverage)]
685async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
686    if node_needs_resolution(node) {
687        (format!("{node}:7433"), None)
688    } else {
689        (node.to_owned(), None)
690    }
691}
692
693/// Build an authenticated GET request.
694fn authed_get(
695    client: &reqwest::Client,
696    url: String,
697    token: Option<&str>,
698) -> reqwest::RequestBuilder {
699    let req = client.get(url);
700    if let Some(t) = token {
701        req.bearer_auth(t)
702    } else {
703        req
704    }
705}
706
707/// Build an authenticated POST request.
708fn authed_post(
709    client: &reqwest::Client,
710    url: String,
711    token: Option<&str>,
712) -> reqwest::RequestBuilder {
713    let req = client.post(url);
714    if let Some(t) = token {
715        req.bearer_auth(t)
716    } else {
717        req
718    }
719}
720
721/// Build an authenticated DELETE request.
722fn authed_delete(
723    client: &reqwest::Client,
724    url: String,
725    token: Option<&str>,
726) -> reqwest::RequestBuilder {
727    let req = client.delete(url);
728    if let Some(t) = token {
729        req.bearer_auth(t)
730    } else {
731        req
732    }
733}
734
735/// Build an authenticated PUT request.
736#[cfg(not(coverage))]
737fn authed_put(
738    client: &reqwest::Client,
739    url: String,
740    token: Option<&str>,
741) -> reqwest::RequestBuilder {
742    let req = client.put(url);
743    if let Some(t) = token {
744        req.bearer_auth(t)
745    } else {
746        req
747    }
748}
749
750/// Fetch session output from the API.
751async fn fetch_output(
752    client: &reqwest::Client,
753    base: &str,
754    name: &str,
755    lines: usize,
756    token: Option<&str>,
757) -> Result<String> {
758    let resp = authed_get(
759        client,
760        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
761        token,
762    )
763    .send()
764    .await?;
765    let text = ok_or_api_error(resp).await?;
766    let output: OutputResponse = serde_json::from_str(&text)?;
767    Ok(output.output)
768}
769
770/// Fetch session status from the API.
771async fn fetch_session_status(
772    client: &reqwest::Client,
773    base: &str,
774    name: &str,
775    token: Option<&str>,
776) -> Result<String> {
777    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
778        .send()
779        .await?;
780    let text = ok_or_api_error(resp).await?;
781    let session: Session = serde_json::from_str(&text)?;
782    Ok(session.status.to_string())
783}
784
785/// Wait for the session to leave "creating" state, then check if it died instantly.
786/// Uses the session ID (not name) to avoid matching old stopped sessions with the same name.
787/// Returns an error with a helpful message if the session is lost/stopped.
788async fn check_session_alive(
789    client: &reqwest::Client,
790    base: &str,
791    session_id: &str,
792    token: Option<&str>,
793) -> Result<()> {
794    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
795    for _ in 0..3 {
796        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
797        // Fetch by ID to avoid name collisions with old sessions
798        let resp = authed_get(
799            client,
800            format!("{base}/api/v1/sessions/{session_id}"),
801            token,
802        )
803        .send()
804        .await;
805        if let Ok(resp) = resp
806            && let Ok(text) = ok_or_api_error(resp).await
807            && let Ok(session) = serde_json::from_str::<Session>(&text)
808        {
809            match session.status.to_string().as_str() {
810                "creating" => continue,
811                "lost" | "stopped" => {
812                    anyhow::bail!(
813                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
814                        session.name,
815                        session.name
816                    );
817                }
818                _ => return Ok(()),
819            }
820        }
821        // fetch failed — don't block, proceed to attach
822        break;
823    }
824    Ok(())
825}
826
827/// Compute the new trailing lines that differ from the previous output.
828///
829/// The output endpoint returns the last N lines from the terminal pane. As new lines
830/// appear, old lines at the top scroll off. We find the overlap between the end
831/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
832/// trailing lines.
833fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
834    if prev.is_empty() {
835        return new;
836    }
837
838    let prev_lines: Vec<&str> = prev.lines().collect();
839    let new_lines: Vec<&str> = new.lines().collect();
840
841    if new_lines.is_empty() {
842        return "";
843    }
844
845    // prev is non-empty (early return above), so last() always succeeds
846    let last_prev = prev_lines[prev_lines.len() - 1];
847
848    // Find the last line of prev in new to determine the overlap boundary
849    for i in (0..new_lines.len()).rev() {
850        if new_lines[i] == last_prev {
851            // Verify contiguous overlap: check that lines before this match too
852            let overlap_len = prev_lines.len().min(i + 1);
853            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
854            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
855            if prev_tail == new_overlap {
856                if i + 1 < new_lines.len() {
857                    // Return the slice of `new` after the overlap
858                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
859                    return new.get(consumed.min(new.len())..).unwrap_or("");
860                }
861                return "";
862            }
863        }
864    }
865
866    // No overlap found — output changed completely, print it all
867    new
868}
869
870/// Follow logs by polling, printing only new output. Returns when the session ends.
871async fn follow_logs(
872    client: &reqwest::Client,
873    base: &str,
874    name: &str,
875    lines: usize,
876    token: Option<&str>,
877    writer: &mut (dyn std::io::Write + Send),
878) -> Result<()> {
879    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
880    write!(writer, "{prev_output}")?;
881
882    let mut unchanged_ticks: u32 = 0;
883
884    loop {
885        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
886
887        // Fetch latest output
888        let new_output = fetch_output(client, base, name, lines, token).await?;
889
890        let diff = diff_output(&prev_output, &new_output);
891        if diff.is_empty() {
892            unchanged_ticks += 1;
893        } else {
894            write!(writer, "{diff}")?;
895            unchanged_ticks = 0;
896        }
897
898        // Check for agent exit marker in output
899        if new_output.contains(AGENT_EXIT_MARKER) {
900            break;
901        }
902
903        prev_output = new_output;
904
905        // Only check session status when output has been unchanged for 3+ ticks
906        if unchanged_ticks >= 3 {
907            let status = fetch_session_status(client, base, name, token).await?;
908            let is_terminal = status == "ready" || status == "stopped" || status == "lost";
909            if is_terminal {
910                break;
911            }
912        }
913    }
914    Ok(())
915}
916
917// --- Schedule API ---
918
919/// Execute a schedule subcommand via the scheduler API.
920#[cfg(not(coverage))]
921async fn execute_schedule(
922    client: &reqwest::Client,
923    action: &ScheduleAction,
924    base: &str,
925    token: Option<&str>,
926) -> Result<String> {
927    match action {
928        ScheduleAction::Add {
929            name,
930            cron,
931            workdir,
932            node,
933            ink,
934            description,
935            command,
936        } => {
937            let cmd = if command.is_empty() {
938                None
939            } else {
940                Some(command.join(" "))
941            };
942            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
943                std::env::current_dir()
944                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
945            });
946            let mut body = serde_json::json!({
947                "name": name,
948                "cron": cron,
949                "workdir": resolved_workdir,
950            });
951            if let Some(c) = &cmd {
952                body["command"] = serde_json::json!(c);
953            }
954            if let Some(n) = node {
955                body["target_node"] = serde_json::json!(n);
956            }
957            if let Some(i) = ink {
958                body["ink"] = serde_json::json!(i);
959            }
960            if let Some(d) = description {
961                body["description"] = serde_json::json!(d);
962            }
963            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
964                .json(&body)
965                .send()
966                .await?;
967            ok_or_api_error(resp).await?;
968            Ok(format!("Created schedule \"{name}\""))
969        }
970        ScheduleAction::List => {
971            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
972                .send()
973                .await?;
974            let text = ok_or_api_error(resp).await?;
975            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
976            Ok(format_schedules(&schedules))
977        }
978        ScheduleAction::Remove { name } => {
979            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
980                .send()
981                .await?;
982            ok_or_api_error(resp).await?;
983            Ok(format!("Removed schedule \"{name}\""))
984        }
985        ScheduleAction::Pause { name } => {
986            let body = serde_json::json!({ "enabled": false });
987            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
988                .json(&body)
989                .send()
990                .await?;
991            ok_or_api_error(resp).await?;
992            Ok(format!("Paused schedule \"{name}\""))
993        }
994        ScheduleAction::Resume { name } => {
995            let body = serde_json::json!({ "enabled": true });
996            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
997                .json(&body)
998                .send()
999                .await?;
1000            ok_or_api_error(resp).await?;
1001            Ok(format!("Resumed schedule \"{name}\""))
1002        }
1003    }
1004}
1005
1006/// Coverage stub for schedule execution.
1007#[cfg(coverage)]
1008#[allow(clippy::unnecessary_wraps)]
1009async fn execute_schedule(
1010    _client: &reqwest::Client,
1011    _action: &ScheduleAction,
1012    _base: &str,
1013    _token: Option<&str>,
1014) -> Result<String> {
1015    Ok(String::new())
1016}
1017
1018// --- Secret API ---
1019
1020/// Format secret entries as a table.
1021#[cfg_attr(coverage, allow(dead_code))]
1022fn format_secrets(secrets: &[serde_json::Value]) -> String {
1023    if secrets.is_empty() {
1024        return "No secrets configured.".into();
1025    }
1026    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
1027    for s in secrets {
1028        let name = s["name"].as_str().unwrap_or("?");
1029        let env_display = s["env"]
1030            .as_str()
1031            .map_or_else(|| name.to_owned(), String::from);
1032        let created = s["created_at"]
1033            .as_str()
1034            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1035        lines.push(format!("{name:<24} {env_display:<24} {created}"));
1036    }
1037    lines.join("\n")
1038}
1039
1040/// Execute a secret subcommand via the secrets API.
1041#[cfg(not(coverage))]
1042async fn execute_secret(
1043    client: &reqwest::Client,
1044    action: &SecretAction,
1045    base: &str,
1046    token: Option<&str>,
1047) -> Result<String> {
1048    match action {
1049        SecretAction::Set { name, value, env } => {
1050            let mut body = serde_json::json!({ "value": value });
1051            if let Some(e) = env {
1052                body["env"] = serde_json::json!(e);
1053            }
1054            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1055                .json(&body)
1056                .send()
1057                .await?;
1058            ok_or_api_error(resp).await?;
1059            Ok(format!("Secret \"{name}\" set."))
1060        }
1061        SecretAction::List => {
1062            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1063                .send()
1064                .await?;
1065            let text = ok_or_api_error(resp).await?;
1066            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1067            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1068            Ok(format_secrets(secrets))
1069        }
1070        SecretAction::Delete { name } => {
1071            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1072                .send()
1073                .await?;
1074            ok_or_api_error(resp).await?;
1075            Ok(format!("Secret \"{name}\" deleted."))
1076        }
1077    }
1078}
1079
1080/// Coverage stub for secret execution.
1081#[cfg(coverage)]
1082#[allow(clippy::unnecessary_wraps)]
1083async fn execute_secret(
1084    _client: &reqwest::Client,
1085    _action: &SecretAction,
1086    _base: &str,
1087    _token: Option<&str>,
1088) -> Result<String> {
1089    Ok(String::new())
1090}
1091
1092/// Execute a worktree subcommand.
1093#[cfg(not(coverage))]
1094async fn execute_worktree(
1095    client: &reqwest::Client,
1096    action: &WorktreeAction,
1097    base: &str,
1098    token: Option<&str>,
1099) -> Result<String> {
1100    match action {
1101        WorktreeAction::List => {
1102            let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1103                .send()
1104                .await?;
1105            let text = ok_or_api_error(resp).await?;
1106            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1107            let wt_sessions: Vec<&Session> = sessions
1108                .iter()
1109                .filter(|s| s.worktree_path.is_some())
1110                .collect();
1111            Ok(format_worktree_sessions(&wt_sessions))
1112        }
1113    }
1114}
1115
1116/// Coverage stub for worktree execution.
1117#[cfg(coverage)]
1118#[allow(clippy::unnecessary_wraps)]
1119async fn execute_worktree(
1120    _client: &reqwest::Client,
1121    _action: &WorktreeAction,
1122    _base: &str,
1123    _token: Option<&str>,
1124) -> Result<String> {
1125    Ok(String::new())
1126}
1127
1128/// Format worktree sessions as a table.
1129fn format_worktree_sessions(sessions: &[&Session]) -> String {
1130    if sessions.is_empty() {
1131        return "No worktree sessions.".into();
1132    }
1133    let mut lines = vec![format!(
1134        "{:<20} {:<20} {:<10} {}",
1135        "NAME", "BRANCH", "STATUS", "PATH"
1136    )];
1137    for s in sessions {
1138        let branch = s.worktree_branch.as_deref().unwrap_or("-");
1139        let path = s.worktree_path.as_deref().unwrap_or("-");
1140        lines.push(format!(
1141            "{:<20} {:<20} {:<10} {}",
1142            s.name, branch, s.status, path
1143        ));
1144    }
1145    lines.join("\n")
1146}
1147
1148/// Format a list of schedules as a table.
1149#[cfg_attr(coverage, allow(dead_code))]
1150fn format_schedules(schedules: &[serde_json::Value]) -> String {
1151    if schedules.is_empty() {
1152        return "No schedules.".into();
1153    }
1154    let mut lines = vec![format!(
1155        "{:<20} {:<18} {:<8} {:<12} {}",
1156        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1157    )];
1158    for s in schedules {
1159        let name = s["name"].as_str().unwrap_or("?");
1160        let cron = s["cron"].as_str().unwrap_or("?");
1161        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1162            "yes"
1163        } else {
1164            "no"
1165        };
1166        let last_run = s["last_run_at"]
1167            .as_str()
1168            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1169        let node = s["target_node"].as_str().unwrap_or("local");
1170        lines.push(format!(
1171            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1172        ));
1173    }
1174    lines.join("\n")
1175}
1176
1177/// Select the best node from the peer registry based on load.
1178/// Returns the node address and name of the least loaded online peer.
1179/// Scoring: lower memory usage + fewer active sessions = better.
1180#[cfg(not(coverage))]
1181async fn select_best_node(
1182    client: &reqwest::Client,
1183    base: &str,
1184    token: Option<&str>,
1185) -> Result<(String, String)> {
1186    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1187        .send()
1188        .await?;
1189    let text = ok_or_api_error(resp).await?;
1190    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1191
1192    // Score: fewer active sessions is better, more memory is better
1193    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1194
1195    for peer in &peers_resp.peers {
1196        if peer.status != pulpo_common::peer::PeerStatus::Online {
1197            continue;
1198        }
1199        let sessions = peer.session_count.unwrap_or(0);
1200        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1201        // Lower score = better (fewer sessions, more memory)
1202        #[allow(clippy::cast_precision_loss)]
1203        let score = sessions as f64 - (mem as f64 / 1024.0);
1204        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1205            best = Some((peer.address.clone(), peer.name.clone(), score));
1206        }
1207    }
1208
1209    // Fall back to local if no online peers
1210    match best {
1211        Some((addr, name, _)) => Ok((addr, name)),
1212        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1213    }
1214}
1215
1216/// Coverage stub — auto-select always falls back to local.
1217#[cfg(coverage)]
1218#[allow(clippy::unnecessary_wraps)]
1219async fn select_best_node(
1220    _client: &reqwest::Client,
1221    _base: &str,
1222    _token: Option<&str>,
1223) -> Result<(String, String)> {
1224    Ok(("localhost:7433".into(), "local".into()))
1225}
1226
1227/// Try to start pulpod if it's not reachable on localhost.
1228/// Returns true if the daemon was started (or was already running).
1229#[cfg(not(coverage))]
1230async fn ensure_daemon_running(client: &reqwest::Client, url: &str, node: &str) -> bool {
1231    if !is_localhost(node) {
1232        return true; // Remote node — not our job to start
1233    }
1234    // Quick health check
1235    if client
1236        .get(format!("{url}/api/v1/health"))
1237        .timeout(std::time::Duration::from_secs(2))
1238        .send()
1239        .await
1240        .is_ok()
1241    {
1242        return true; // Already running
1243    }
1244
1245    eprintln!("pulpod is not running — starting it...");
1246
1247    // Try brew services first (macOS), then systemd (Linux), then direct spawn
1248    let started = if cfg!(target_os = "macos") {
1249        std::process::Command::new("brew")
1250            .args(["services", "start", "pulpo"])
1251            .stdout(std::process::Stdio::null())
1252            .stderr(std::process::Stdio::null())
1253            .status()
1254            .is_ok_and(|s| s.success())
1255    } else {
1256        std::process::Command::new("systemctl")
1257            .args(["--user", "start", "pulpo"])
1258            .stdout(std::process::Stdio::null())
1259            .stderr(std::process::Stdio::null())
1260            .status()
1261            .is_ok_and(|s| s.success())
1262    };
1263
1264    if !started {
1265        // Fallback: spawn pulpod directly in background
1266        if std::process::Command::new("pulpod")
1267            .stdout(std::process::Stdio::null())
1268            .stderr(std::process::Stdio::null())
1269            .spawn()
1270            .is_err()
1271        {
1272            eprintln!(
1273                "Failed to start pulpod. Install it with: brew install darioblanco/tap/pulpo"
1274            );
1275            return false;
1276        }
1277    }
1278
1279    // Wait for it to become reachable (up to 5 seconds)
1280    for _ in 0..10 {
1281        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1282        if client
1283            .get(format!("{url}/api/v1/health"))
1284            .timeout(std::time::Duration::from_secs(1))
1285            .send()
1286            .await
1287            .is_ok()
1288        {
1289            eprintln!("pulpod started.");
1290            return true;
1291        }
1292    }
1293    eprintln!("pulpod did not start in time.");
1294    false
1295}
1296
1297/// Coverage stub for daemon auto-start.
1298#[cfg(coverage)]
1299async fn ensure_daemon_running(_client: &reqwest::Client, _url: &str, _node: &str) -> bool {
1300    true
1301}
1302
1303/// Execute the given CLI command against the specified node.
1304#[allow(clippy::too_many_lines)]
1305pub async fn execute(cli: &Cli) -> Result<String> {
1306    let client = reqwest::Client::new();
1307    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1308    let url = base_url(&resolved_node);
1309    let node = &resolved_node;
1310
1311    // Auto-start pulpod if it's not running on localhost
1312    ensure_daemon_running(&client, &url, node).await;
1313
1314    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1315        .await
1316        .or(peer_token);
1317
1318    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1319    if cli.command.is_none() && cli.path.is_none() {
1320        // No subcommand and no path: print help
1321        use clap::CommandFactory;
1322        let mut cmd = Cli::command();
1323        cmd.print_help()?;
1324        println!();
1325        return Ok(String::new());
1326    }
1327    if cli.command.is_none() {
1328        let path = cli.path.as_deref().unwrap_or(".");
1329        let resolved_workdir = resolve_path(path);
1330        let base_name = derive_session_name(&resolved_workdir);
1331        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1332        let body = serde_json::json!({
1333            "name": name,
1334            "workdir": resolved_workdir,
1335        });
1336        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1337            .json(&body)
1338            .send()
1339            .await
1340            .map_err(|e| friendly_error(&e, node))?;
1341        let text = ok_or_api_error(resp).await?;
1342        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1343        let msg = format!(
1344            "Created session \"{}\" ({})",
1345            resp.session.name, resp.session.id
1346        );
1347        let backend_id = resp
1348            .session
1349            .backend_session_id
1350            .as_deref()
1351            .unwrap_or(&resp.session.name);
1352        eprintln!("{msg}");
1353        // Path shortcut spawns a shell (no command) — skip liveness check
1354        // since shell sessions are immediately detected as idle by the watchdog
1355        attach_session(backend_id)?;
1356        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1357    }
1358
1359    match cli.command.as_ref().unwrap() {
1360        Commands::Attach { name } => {
1361            // Fetch session to get status and backend_session_id
1362            let resp = authed_get(
1363                &client,
1364                format!("{url}/api/v1/sessions/{name}"),
1365                token.as_deref(),
1366            )
1367            .send()
1368            .await
1369            .map_err(|e| friendly_error(&e, node))?;
1370            let text = ok_or_api_error(resp).await?;
1371            let session: Session = serde_json::from_str(&text)?;
1372            match session.status.to_string().as_str() {
1373                "lost" => {
1374                    anyhow::bail!(
1375                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1376                    );
1377                }
1378                "stopped" => {
1379                    anyhow::bail!(
1380                        "Session \"{name}\" is {} — cannot attach to a stopped session.",
1381                        session.status
1382                    );
1383                }
1384                _ => {}
1385            }
1386            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1387            attach_session(&backend_id)?;
1388            Ok(format!("Detached from session {name}."))
1389        }
1390        Commands::Input { name, text } => {
1391            let input_text = text.as_deref().unwrap_or("\n");
1392            let body = serde_json::json!({ "text": input_text });
1393            let resp = authed_post(
1394                &client,
1395                format!("{url}/api/v1/sessions/{name}/input"),
1396                token.as_deref(),
1397            )
1398            .json(&body)
1399            .send()
1400            .await
1401            .map_err(|e| friendly_error(&e, node))?;
1402            ok_or_api_error(resp).await?;
1403            Ok(format!("Sent input to session {name}."))
1404        }
1405        Commands::List { all } => {
1406            let list_url = if *all {
1407                format!("{url}/api/v1/sessions")
1408            } else {
1409                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1410            };
1411            let resp = authed_get(&client, list_url, token.as_deref())
1412                .send()
1413                .await
1414                .map_err(|e| friendly_error(&e, node))?;
1415            let text = ok_or_api_error(resp).await?;
1416            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1417            Ok(format_sessions(&sessions))
1418        }
1419        Commands::Nodes => {
1420            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1421                .send()
1422                .await
1423                .map_err(|e| friendly_error(&e, node))?;
1424            let text = ok_or_api_error(resp).await?;
1425            let resp: PeersResponse = serde_json::from_str(&text)?;
1426            Ok(format_nodes(&resp))
1427        }
1428        Commands::Spawn {
1429            workdir,
1430            name,
1431            ink,
1432            description,
1433            detach,
1434            idle_threshold,
1435            auto,
1436            worktree,
1437            worktree_base,
1438            runtime,
1439            secret,
1440            command,
1441        } => {
1442            let cmd = if command.is_empty() {
1443                None
1444            } else {
1445                Some(command.join(" "))
1446            };
1447            // Resolve workdir: --workdir flag > current directory
1448            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1449                std::env::current_dir()
1450                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1451            });
1452            // Resolve name: explicit > derived from workdir (with dedup)
1453            let resolved_name = if let Some(n) = name {
1454                n.clone()
1455            } else {
1456                let base_name = derive_session_name(&resolved_workdir);
1457                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1458            };
1459            let mut body = serde_json::json!({
1460                "name": resolved_name,
1461                "workdir": resolved_workdir,
1462            });
1463            if let Some(c) = &cmd {
1464                body["command"] = serde_json::json!(c);
1465            }
1466            if let Some(i) = ink {
1467                body["ink"] = serde_json::json!(i);
1468            }
1469            if let Some(d) = description {
1470                body["description"] = serde_json::json!(d);
1471            }
1472            if let Some(t) = idle_threshold {
1473                body["idle_threshold_secs"] = serde_json::json!(t);
1474            }
1475            // --base-branch implies --worktree
1476            if *worktree || worktree_base.is_some() {
1477                body["worktree"] = serde_json::json!(true);
1478                if let Some(base) = worktree_base {
1479                    body["worktree_base"] = serde_json::json!(base);
1480                    eprintln!(
1481                        "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1482                    );
1483                } else {
1484                    eprintln!(
1485                        "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1486                    );
1487                }
1488            }
1489            if let Some(rt) = runtime {
1490                body["runtime"] = serde_json::json!(rt);
1491            }
1492            if !secret.is_empty() {
1493                body["secrets"] = serde_json::json!(secret);
1494            }
1495            let spawn_url = if *auto {
1496                let (auto_addr, auto_name) =
1497                    select_best_node(&client, &url, token.as_deref()).await?;
1498                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1499                base_url(&auto_addr)
1500            } else {
1501                url.clone()
1502            };
1503            let resp = authed_post(
1504                &client,
1505                format!("{spawn_url}/api/v1/sessions"),
1506                token.as_deref(),
1507            )
1508            .json(&body)
1509            .send()
1510            .await
1511            .map_err(|e| friendly_error(&e, node))?;
1512            let text = ok_or_api_error(resp).await?;
1513            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1514            let msg = format!(
1515                "Created session \"{}\" ({})",
1516                resp.session.name, resp.session.id
1517            );
1518            if !detach {
1519                let backend_id = resp
1520                    .session
1521                    .backend_session_id
1522                    .as_deref()
1523                    .unwrap_or(&resp.session.name);
1524                eprintln!("{msg}");
1525                // Only check liveness for explicit commands — shell sessions (no command)
1526                // may be immediately marked idle/stopped by the watchdog, which is expected
1527                if cmd.is_some() {
1528                    let sid = resp.session.id.to_string();
1529                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1530                }
1531                attach_session(backend_id)?;
1532                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1533            }
1534            Ok(msg)
1535        }
1536        Commands::Stop { names, purge } => {
1537            let mut results = Vec::new();
1538            for name in names {
1539                let query = if *purge { "?purge=true" } else { "" };
1540                let resp = authed_post(
1541                    &client,
1542                    format!("{url}/api/v1/sessions/{name}/stop{query}"),
1543                    token.as_deref(),
1544                )
1545                .send()
1546                .await
1547                .map_err(|e| friendly_error(&e, node))?;
1548                let action = if *purge {
1549                    "stopped and purged"
1550                } else {
1551                    "stopped"
1552                };
1553                match ok_or_api_error(resp).await {
1554                    Ok(_) => results.push(format!("Session {name} {action}.")),
1555                    Err(e) => results.push(format!("Error stopping {name}: {e}")),
1556                }
1557            }
1558            Ok(results.join("\n"))
1559        }
1560        Commands::Cleanup => {
1561            let resp = authed_post(
1562                &client,
1563                format!("{url}/api/v1/sessions/cleanup"),
1564                token.as_deref(),
1565            )
1566            .send()
1567            .await
1568            .map_err(|e| friendly_error(&e, node))?;
1569            let text = ok_or_api_error(resp).await?;
1570            let result: serde_json::Value = serde_json::from_str(&text)?;
1571            let count = result["deleted"].as_u64().unwrap_or(0);
1572            if count == 0 {
1573                Ok("No stopped or lost sessions to clean up.".into())
1574            } else {
1575                Ok(format!("Cleaned up {count} session(s)."))
1576            }
1577        }
1578        Commands::Logs {
1579            name,
1580            lines,
1581            follow,
1582        } => {
1583            if *follow {
1584                let mut stdout = std::io::stdout();
1585                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1586                    .await
1587                    .map_err(|e| {
1588                        // Unwrap reqwest errors to friendly messages
1589                        match e.downcast::<reqwest::Error>() {
1590                            Ok(re) => friendly_error(&re, node),
1591                            Err(other) => other,
1592                        }
1593                    })?;
1594                Ok(String::new())
1595            } else {
1596                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1597                    .await
1598                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1599                        Ok(re) => friendly_error(&re, node),
1600                        Err(other) => other,
1601                    })?;
1602                Ok(output)
1603            }
1604        }
1605        Commands::Interventions { name } => {
1606            let resp = authed_get(
1607                &client,
1608                format!("{url}/api/v1/sessions/{name}/interventions"),
1609                token.as_deref(),
1610            )
1611            .send()
1612            .await
1613            .map_err(|e| friendly_error(&e, node))?;
1614            let text = ok_or_api_error(resp).await?;
1615            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1616            Ok(format_interventions(&events))
1617        }
1618        Commands::Ui => {
1619            let dashboard = base_url(node);
1620            open_browser(&dashboard)?;
1621            Ok(format!("Opening {dashboard}"))
1622        }
1623        Commands::Resume { name } => {
1624            let resp = authed_post(
1625                &client,
1626                format!("{url}/api/v1/sessions/{name}/resume"),
1627                token.as_deref(),
1628            )
1629            .send()
1630            .await
1631            .map_err(|e| friendly_error(&e, node))?;
1632            let text = ok_or_api_error(resp).await?;
1633            let session: Session = serde_json::from_str(&text)?;
1634            let backend_id = session
1635                .backend_session_id
1636                .as_deref()
1637                .unwrap_or(&session.name);
1638            eprintln!("Resumed session \"{}\"", session.name);
1639            let sid = session.id.to_string();
1640            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1641            attach_session(backend_id)?;
1642            Ok(format!("Detached from session \"{}\".", session.name))
1643        }
1644        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1645            .await
1646            .map_err(|e| match e.downcast::<reqwest::Error>() {
1647                Ok(re) => friendly_error(&re, node),
1648                Err(other) => other,
1649            }),
1650        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1651            .await
1652            .map_err(|e| match e.downcast::<reqwest::Error>() {
1653                Ok(re) => friendly_error(&re, node),
1654                Err(other) => other,
1655            }),
1656        Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1657            .await
1658            .map_err(|e| match e.downcast::<reqwest::Error>() {
1659                Ok(re) => friendly_error(&re, node),
1660                Err(other) => other,
1661            }),
1662    }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667    use super::*;
1668
1669    /// Helper to build a minimal `Session` for `format_repo` tests.
1670    fn repo_session(workdir: &str, branch: Option<&str>) -> Session {
1671        Session {
1672            id: uuid::Uuid::nil(),
1673            name: "test".into(),
1674            workdir: workdir.into(),
1675            command: "echo".into(),
1676            description: None,
1677            status: pulpo_common::session::SessionStatus::Active,
1678            exit_code: None,
1679            backend_session_id: None,
1680            output_snapshot: None,
1681            metadata: None,
1682            ink: None,
1683            intervention_code: None,
1684            intervention_reason: None,
1685            intervention_at: None,
1686            last_output_at: None,
1687            idle_since: None,
1688            idle_threshold_secs: None,
1689            worktree_path: None,
1690            worktree_branch: None,
1691            git_branch: branch.map(Into::into),
1692            git_commit: None,
1693            git_files_changed: None,
1694            git_insertions: None,
1695            git_deletions: None,
1696            git_ahead: None,
1697            runtime: Runtime::Tmux,
1698            created_at: chrono::Utc::now(),
1699            updated_at: chrono::Utc::now(),
1700        }
1701    }
1702
1703    #[test]
1704    fn test_base_url() {
1705        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1706        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1707    }
1708
1709    #[test]
1710    fn test_cli_parse_list() {
1711        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1712        assert_eq!(cli.node, "localhost:7433");
1713        assert!(matches!(cli.command, Some(Commands::List { .. })));
1714    }
1715
1716    #[test]
1717    fn test_cli_parse_nodes() {
1718        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1719        assert!(matches!(cli.command, Some(Commands::Nodes)));
1720    }
1721
1722    #[test]
1723    fn test_cli_parse_ui() {
1724        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1725        assert!(matches!(cli.command, Some(Commands::Ui)));
1726    }
1727
1728    #[test]
1729    fn test_cli_parse_ui_custom_node() {
1730        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1731        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1732        assert_eq!(cli.node, "mac-mini:7433");
1733        assert_eq!(cli.path.as_deref(), Some("ui"));
1734    }
1735
1736    #[test]
1737    fn test_build_open_command() {
1738        let cmd = build_open_command("http://localhost:7433");
1739        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1740        assert_eq!(args, vec!["http://localhost:7433"]);
1741        #[cfg(target_os = "macos")]
1742        assert_eq!(cmd.get_program(), "open");
1743        #[cfg(target_os = "linux")]
1744        assert_eq!(cmd.get_program(), "xdg-open");
1745    }
1746
1747    #[test]
1748    fn test_cli_parse_spawn() {
1749        let cli = Cli::try_parse_from([
1750            "pulpo",
1751            "spawn",
1752            "my-task",
1753            "--workdir",
1754            "/tmp/repo",
1755            "--",
1756            "claude",
1757            "-p",
1758            "Fix the bug",
1759        ])
1760        .unwrap();
1761        assert!(matches!(
1762            &cli.command,
1763            Some(Commands::Spawn { name, workdir, command, .. })
1764                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1765                && command == &["claude", "-p", "Fix the bug"]
1766        ));
1767    }
1768
1769    #[test]
1770    fn test_cli_parse_spawn_with_ink() {
1771        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1772        assert!(matches!(
1773            &cli.command,
1774            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1775        ));
1776    }
1777
1778    #[test]
1779    fn test_cli_parse_spawn_with_description() {
1780        let cli =
1781            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1782                .unwrap();
1783        assert!(matches!(
1784            &cli.command,
1785            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1786        ));
1787    }
1788
1789    #[test]
1790    fn test_cli_parse_spawn_name_positional() {
1791        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1792        assert!(matches!(
1793            &cli.command,
1794            Some(Commands::Spawn { name, command, .. })
1795                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1796        ));
1797    }
1798
1799    #[test]
1800    fn test_cli_parse_spawn_no_command() {
1801        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1802        assert!(matches!(
1803            &cli.command,
1804            Some(Commands::Spawn { command, .. }) if command.is_empty()
1805        ));
1806    }
1807
1808    #[test]
1809    fn test_cli_parse_spawn_idle_threshold() {
1810        let cli =
1811            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1812        assert!(matches!(
1813            &cli.command,
1814            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1815        ));
1816    }
1817
1818    #[test]
1819    fn test_cli_parse_spawn_idle_threshold_60() {
1820        let cli =
1821            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1822        assert!(matches!(
1823            &cli.command,
1824            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1825        ));
1826    }
1827
1828    #[test]
1829    fn test_cli_parse_spawn_secrets() {
1830        let cli = Cli::try_parse_from([
1831            "pulpo",
1832            "spawn",
1833            "my-task",
1834            "--secret",
1835            "GITHUB_TOKEN",
1836            "--secret",
1837            "NPM_TOKEN",
1838        ])
1839        .unwrap();
1840        assert!(matches!(
1841            &cli.command,
1842            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1843        ));
1844    }
1845
1846    #[test]
1847    fn test_cli_parse_spawn_no_secrets() {
1848        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1849        assert!(matches!(
1850            &cli.command,
1851            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1852        ));
1853    }
1854
1855    #[test]
1856    fn test_cli_parse_secret_set_with_env() {
1857        let cli = Cli::try_parse_from([
1858            "pulpo",
1859            "secret",
1860            "set",
1861            "GH_WORK",
1862            "token123",
1863            "--env",
1864            "GITHUB_TOKEN",
1865        ])
1866        .unwrap();
1867        assert!(matches!(
1868            &cli.command,
1869            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1870                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1871        ));
1872    }
1873
1874    #[test]
1875    fn test_cli_parse_secret_set_without_env() {
1876        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1877        assert!(matches!(
1878            &cli.command,
1879            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1880                if name == "MY_KEY" && value == "val" && env.is_none()
1881        ));
1882    }
1883
1884    #[test]
1885    fn test_cli_parse_spawn_worktree() {
1886        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1887        assert!(matches!(
1888            &cli.command,
1889            Some(Commands::Spawn { worktree, .. }) if *worktree
1890        ));
1891    }
1892
1893    #[test]
1894    fn test_cli_parse_spawn_worktree_base() {
1895        let cli = Cli::try_parse_from([
1896            "pulpo",
1897            "spawn",
1898            "my-task",
1899            "--worktree",
1900            "--worktree-base",
1901            "main",
1902        ])
1903        .unwrap();
1904        assert!(matches!(
1905            &cli.command,
1906            Some(Commands::Spawn { worktree, worktree_base, .. })
1907                if *worktree && worktree_base.as_deref() == Some("main")
1908        ));
1909    }
1910
1911    #[test]
1912    fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1913        // --worktree-base without --worktree should still parse (implied at execute time)
1914        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1915            .unwrap();
1916        assert!(matches!(
1917            &cli.command,
1918            Some(Commands::Spawn { worktree_base, .. })
1919                if worktree_base.as_deref() == Some("develop")
1920        ));
1921    }
1922
1923    #[test]
1924    fn test_cli_parse_worktree_list() {
1925        let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1926        assert!(matches!(
1927            &cli.command,
1928            Some(Commands::Worktree {
1929                action: WorktreeAction::List
1930            })
1931        ));
1932    }
1933
1934    #[test]
1935    fn test_cli_parse_wt_alias() {
1936        let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1937        assert!(matches!(
1938            &cli.command,
1939            Some(Commands::Worktree {
1940                action: WorktreeAction::List
1941            })
1942        ));
1943    }
1944
1945    #[test]
1946    fn test_cli_parse_worktree_list_ls_alias() {
1947        let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1948        assert!(matches!(
1949            &cli.command,
1950            Some(Commands::Worktree {
1951                action: WorktreeAction::List
1952            })
1953        ));
1954    }
1955
1956    #[test]
1957    fn test_format_worktree_sessions_empty() {
1958        let output = format_worktree_sessions(&[]);
1959        assert_eq!(output, "No worktree sessions.");
1960    }
1961
1962    #[test]
1963    fn test_format_worktree_sessions_with_data() {
1964        use chrono::Utc;
1965        use pulpo_common::session::SessionStatus;
1966        use uuid::Uuid;
1967
1968        let session = Session {
1969            id: Uuid::nil(),
1970            name: "fix-auth".into(),
1971            workdir: "/tmp/repo".into(),
1972            command: "claude -p 'fix auth'".into(),
1973            description: None,
1974            status: SessionStatus::Active,
1975            exit_code: None,
1976            backend_session_id: None,
1977            output_snapshot: None,
1978            metadata: None,
1979            ink: None,
1980            intervention_code: None,
1981            intervention_reason: None,
1982            intervention_at: None,
1983            last_output_at: None,
1984            idle_since: None,
1985            idle_threshold_secs: None,
1986            worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
1987            worktree_branch: Some("fix-auth".into()),
1988            git_branch: None,
1989            git_commit: None,
1990            git_files_changed: None,
1991            git_insertions: None,
1992            git_deletions: None,
1993            git_ahead: None,
1994            runtime: Runtime::Tmux,
1995            created_at: Utc::now(),
1996            updated_at: Utc::now(),
1997        };
1998        let sessions = vec![&session];
1999        let output = format_worktree_sessions(&sessions);
2000        assert!(output.contains("fix-auth"), "should show name: {output}");
2001        assert!(output.contains("active"), "should show status: {output}");
2002        assert!(
2003            output.contains("/home/user/.pulpo/worktrees/fix-auth"),
2004            "should show path: {output}"
2005        );
2006        assert!(output.contains("BRANCH"), "should have header: {output}");
2007    }
2008
2009    #[test]
2010    fn test_format_worktree_sessions_no_branch() {
2011        use chrono::Utc;
2012        use pulpo_common::session::SessionStatus;
2013        use uuid::Uuid;
2014
2015        let session = Session {
2016            id: Uuid::nil(),
2017            name: "old-session".into(),
2018            workdir: "/tmp".into(),
2019            command: "echo".into(),
2020            description: None,
2021            status: SessionStatus::Active,
2022            exit_code: None,
2023            backend_session_id: None,
2024            output_snapshot: None,
2025            metadata: None,
2026            ink: None,
2027            intervention_code: None,
2028            intervention_reason: None,
2029            intervention_at: None,
2030            last_output_at: None,
2031            idle_since: None,
2032            idle_threshold_secs: None,
2033            worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
2034            worktree_branch: None,
2035            git_branch: None,
2036            git_commit: None,
2037            git_files_changed: None,
2038            git_insertions: None,
2039            git_deletions: None,
2040            git_ahead: None,
2041            runtime: Runtime::Tmux,
2042            created_at: Utc::now(),
2043            updated_at: Utc::now(),
2044        };
2045        let sessions = vec![&session];
2046        let output = format_worktree_sessions(&sessions);
2047        assert!(
2048            output.contains('-'),
2049            "branch should show dash when None: {output}"
2050        );
2051    }
2052
2053    #[test]
2054    fn test_cli_parse_spawn_detach() {
2055        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
2056        assert!(matches!(
2057            &cli.command,
2058            Some(Commands::Spawn { detach, .. }) if *detach
2059        ));
2060    }
2061
2062    #[test]
2063    fn test_cli_parse_spawn_detach_short() {
2064        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
2065        assert!(matches!(
2066            &cli.command,
2067            Some(Commands::Spawn { detach, .. }) if *detach
2068        ));
2069    }
2070
2071    #[test]
2072    fn test_cli_parse_spawn_detach_default() {
2073        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
2074        assert!(matches!(
2075            &cli.command,
2076            Some(Commands::Spawn { detach, .. }) if !detach
2077        ));
2078    }
2079
2080    #[test]
2081    fn test_cli_parse_logs() {
2082        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
2083        assert!(matches!(
2084            &cli.command,
2085            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
2086        ));
2087    }
2088
2089    #[test]
2090    fn test_cli_parse_logs_with_lines() {
2091        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
2092        assert!(matches!(
2093            &cli.command,
2094            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
2095        ));
2096    }
2097
2098    #[test]
2099    fn test_cli_parse_logs_follow() {
2100        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
2101        assert!(matches!(
2102            &cli.command,
2103            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2104        ));
2105    }
2106
2107    #[test]
2108    fn test_cli_parse_logs_follow_short() {
2109        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
2110        assert!(matches!(
2111            &cli.command,
2112            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2113        ));
2114    }
2115
2116    #[test]
2117    fn test_cli_parse_stop() {
2118        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
2119        assert!(matches!(
2120            &cli.command,
2121            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2122        ));
2123    }
2124
2125    #[test]
2126    fn test_cli_parse_stop_purge() {
2127        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
2128        assert!(matches!(
2129            &cli.command,
2130            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2131        ));
2132
2133        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
2134        assert!(matches!(
2135            &cli.command,
2136            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2137        ));
2138    }
2139
2140    #[test]
2141    fn test_cli_parse_kill_alias() {
2142        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
2143        assert!(matches!(
2144            &cli.command,
2145            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2146        ));
2147    }
2148
2149    #[test]
2150    fn test_cli_parse_resume() {
2151        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
2152        assert!(matches!(
2153            &cli.command,
2154            Some(Commands::Resume { name }) if name == "my-session"
2155        ));
2156    }
2157
2158    #[test]
2159    fn test_cli_parse_input() {
2160        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2161        assert!(matches!(
2162            &cli.command,
2163            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2164        ));
2165    }
2166
2167    #[test]
2168    fn test_cli_parse_input_no_text() {
2169        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2170        assert!(matches!(
2171            &cli.command,
2172            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2173        ));
2174    }
2175
2176    #[test]
2177    fn test_cli_parse_input_alias() {
2178        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2179        assert!(matches!(
2180            &cli.command,
2181            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2182        ));
2183    }
2184
2185    #[test]
2186    fn test_cli_parse_custom_node() {
2187        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2188        assert_eq!(cli.node, "win-pc:8080");
2189        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
2190        assert_eq!(cli.path.as_deref(), Some("list"));
2191    }
2192
2193    #[test]
2194    fn test_cli_version() {
2195        let result = Cli::try_parse_from(["pulpo", "--version"]);
2196        // clap exits with an error (kind DisplayVersion) when --version is used
2197        let err = result.unwrap_err();
2198        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2199    }
2200
2201    #[test]
2202    fn test_cli_parse_no_subcommand_succeeds() {
2203        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2204        assert!(cli.command.is_none());
2205        assert!(cli.path.is_none());
2206    }
2207
2208    #[test]
2209    fn test_cli_debug() {
2210        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2211        let debug = format!("{cli:?}");
2212        assert!(debug.contains("List"));
2213    }
2214
2215    #[test]
2216    fn test_commands_debug() {
2217        let cmd = Commands::List { all: false };
2218        assert_eq!(format!("{cmd:?}"), "List { all: false }");
2219    }
2220
2221    /// A valid Session JSON for test responses.
2222    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2223
2224    /// A valid `CreateSessionResponse` JSON wrapping the session.
2225    fn test_create_response_json() -> String {
2226        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2227    }
2228
2229    /// Start a lightweight test HTTP server and return its address.
2230    async fn start_test_server() -> String {
2231        use axum::http::StatusCode;
2232        use axum::{
2233            Json, Router,
2234            routing::{get, post},
2235        };
2236
2237        let create_json = test_create_response_json();
2238
2239        let app = Router::new()
2240            .route(
2241                "/api/v1/sessions",
2242                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2243                    (StatusCode::CREATED, create_json.clone())
2244                }),
2245            )
2246            .route(
2247                "/api/v1/sessions/{id}",
2248                get(|| async { TEST_SESSION_JSON.to_owned() }),
2249            )
2250            .route(
2251                "/api/v1/sessions/{id}/stop",
2252                post(|| async { StatusCode::NO_CONTENT }),
2253            )
2254            .route(
2255                "/api/v1/sessions/{id}/output",
2256                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2257            )
2258            .route(
2259                "/api/v1/peers",
2260                get(|| async {
2261                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2262                }),
2263            )
2264            .route(
2265                "/api/v1/sessions/{id}/resume",
2266                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2267            )
2268            .route(
2269                "/api/v1/sessions/{id}/interventions",
2270                get(|| async { "[]".to_owned() }),
2271            )
2272            .route(
2273                "/api/v1/sessions/{id}/input",
2274                post(|| async { StatusCode::NO_CONTENT }),
2275            )
2276            .route(
2277                "/api/v1/schedules",
2278                get(|| async { Json::<Vec<()>>(vec![]) })
2279                    .post(|| async { StatusCode::CREATED }),
2280            )
2281            .route(
2282                "/api/v1/schedules/{id}",
2283                axum::routing::put(|| async { StatusCode::OK })
2284                    .delete(|| async { StatusCode::NO_CONTENT }),
2285            );
2286
2287        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2288        let addr = listener.local_addr().unwrap();
2289        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2290        format!("127.0.0.1:{}", addr.port())
2291    }
2292
2293    #[tokio::test]
2294    async fn test_execute_list_success() {
2295        let node = start_test_server().await;
2296        let cli = Cli {
2297            node,
2298            token: None,
2299            command: Some(Commands::List { all: false }),
2300            path: None,
2301        };
2302        let result = execute(&cli).await.unwrap();
2303        assert_eq!(result, "No sessions.");
2304    }
2305
2306    #[tokio::test]
2307    async fn test_execute_nodes_success() {
2308        let node = start_test_server().await;
2309        let cli = Cli {
2310            node,
2311            token: None,
2312            command: Some(Commands::Nodes),
2313            path: None,
2314        };
2315        let result = execute(&cli).await.unwrap();
2316        assert!(result.contains("test"));
2317        assert!(result.contains("(local)"));
2318        assert!(result.contains("NAME"));
2319    }
2320
2321    #[tokio::test]
2322    async fn test_execute_spawn_success() {
2323        let node = start_test_server().await;
2324        let cli = Cli {
2325            node,
2326            token: None,
2327            command: Some(Commands::Spawn {
2328                name: Some("test".into()),
2329                workdir: Some("/tmp/repo".into()),
2330                ink: None,
2331                description: None,
2332                detach: true,
2333                idle_threshold: None,
2334                auto: false,
2335                worktree: false,
2336                worktree_base: None,
2337                runtime: None,
2338                secret: vec![],
2339                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2340            }),
2341            path: None,
2342        };
2343        let result = execute(&cli).await.unwrap();
2344        assert!(result.contains("Created session"));
2345        assert!(result.contains("repo"));
2346    }
2347
2348    #[tokio::test]
2349    async fn test_execute_spawn_with_all_flags() {
2350        let node = start_test_server().await;
2351        let cli = Cli {
2352            node,
2353            token: None,
2354            command: Some(Commands::Spawn {
2355                name: Some("test".into()),
2356                workdir: Some("/tmp/repo".into()),
2357                ink: Some("coder".into()),
2358                description: Some("Fix the bug".into()),
2359                detach: true,
2360                idle_threshold: None,
2361                auto: false,
2362                worktree: false,
2363                worktree_base: None,
2364                runtime: None,
2365                secret: vec![],
2366                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2367            }),
2368            path: None,
2369        };
2370        let result = execute(&cli).await.unwrap();
2371        assert!(result.contains("Created session"));
2372    }
2373
2374    #[tokio::test]
2375    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2376        let node = start_test_server().await;
2377        let cli = Cli {
2378            node,
2379            token: None,
2380            command: Some(Commands::Spawn {
2381                name: Some("full-opts".into()),
2382                workdir: Some("/tmp/repo".into()),
2383                ink: Some("coder".into()),
2384                description: Some("Full options".into()),
2385                detach: true,
2386                idle_threshold: Some(120),
2387                auto: false,
2388                worktree: true,
2389                worktree_base: None,
2390                runtime: Some("docker".into()),
2391                secret: vec![],
2392                command: vec!["claude".into()],
2393            }),
2394            path: None,
2395        };
2396        let result = execute(&cli).await.unwrap();
2397        assert!(result.contains("Created session"));
2398    }
2399
2400    #[tokio::test]
2401    async fn test_execute_spawn_no_name_derives_from_workdir() {
2402        let node = start_test_server().await;
2403        let cli = Cli {
2404            node,
2405            token: None,
2406            command: Some(Commands::Spawn {
2407                name: None,
2408                workdir: Some("/tmp/my-project".into()),
2409                ink: None,
2410                description: None,
2411                detach: true,
2412                idle_threshold: None,
2413                auto: false,
2414                worktree: false,
2415                worktree_base: None,
2416                runtime: None,
2417                secret: vec![],
2418                command: vec!["echo".into(), "hello".into()],
2419            }),
2420            path: None,
2421        };
2422        let result = execute(&cli).await.unwrap();
2423        assert!(result.contains("Created session"));
2424    }
2425
2426    #[tokio::test]
2427    async fn test_execute_spawn_no_command() {
2428        let node = start_test_server().await;
2429        let cli = Cli {
2430            node,
2431            token: None,
2432            command: Some(Commands::Spawn {
2433                name: Some("test".into()),
2434                workdir: Some("/tmp/repo".into()),
2435                ink: None,
2436                description: None,
2437                detach: true,
2438                idle_threshold: None,
2439                auto: false,
2440                worktree: false,
2441                worktree_base: None,
2442                runtime: None,
2443                secret: vec![],
2444                command: vec![],
2445            }),
2446            path: None,
2447        };
2448        let result = execute(&cli).await.unwrap();
2449        assert!(result.contains("Created session"));
2450    }
2451
2452    #[tokio::test]
2453    async fn test_execute_spawn_with_name() {
2454        let node = start_test_server().await;
2455        let cli = Cli {
2456            node,
2457            token: None,
2458            command: Some(Commands::Spawn {
2459                name: Some("my-task".into()),
2460                workdir: Some("/tmp/repo".into()),
2461                ink: None,
2462                description: None,
2463                detach: true,
2464                idle_threshold: None,
2465                auto: false,
2466                worktree: false,
2467                worktree_base: None,
2468                runtime: None,
2469                secret: vec![],
2470                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2471            }),
2472            path: None,
2473        };
2474        let result = execute(&cli).await.unwrap();
2475        assert!(result.contains("Created session"));
2476    }
2477
2478    #[tokio::test]
2479    async fn test_execute_spawn_auto_attach() {
2480        let node = start_test_server().await;
2481        let cli = Cli {
2482            node,
2483            token: None,
2484            command: Some(Commands::Spawn {
2485                name: Some("test".into()),
2486                workdir: Some("/tmp/repo".into()),
2487                ink: None,
2488                description: None,
2489                detach: false,
2490                idle_threshold: None,
2491                auto: false,
2492                worktree: false,
2493                worktree_base: None,
2494                runtime: None,
2495                secret: vec![],
2496                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2497            }),
2498            path: None,
2499        };
2500        let result = execute(&cli).await.unwrap();
2501        // When not detached, spawn prints creation to stderr and returns detach message
2502        assert!(result.contains("Detached from session"));
2503    }
2504
2505    #[tokio::test]
2506    async fn test_execute_stop_success() {
2507        let node = start_test_server().await;
2508        let cli = Cli {
2509            node,
2510            token: None,
2511            command: Some(Commands::Stop {
2512                names: vec!["test-session".into()],
2513                purge: false,
2514            }),
2515            path: None,
2516        };
2517        let result = execute(&cli).await.unwrap();
2518        assert!(result.contains("stopped"));
2519        assert!(!result.contains("purged"));
2520    }
2521
2522    #[tokio::test]
2523    async fn test_execute_stop_with_purge() {
2524        let node = start_test_server().await;
2525        let cli = Cli {
2526            node,
2527            token: None,
2528            command: Some(Commands::Stop {
2529                names: vec!["test-session".into()],
2530                purge: true,
2531            }),
2532            path: None,
2533        };
2534        let result = execute(&cli).await.unwrap();
2535        assert!(result.contains("stopped and purged"));
2536    }
2537
2538    #[tokio::test]
2539    async fn test_execute_logs_success() {
2540        let node = start_test_server().await;
2541        let cli = Cli {
2542            node,
2543            token: None,
2544            command: Some(Commands::Logs {
2545                name: "test-session".into(),
2546                lines: 50,
2547                follow: false,
2548            }),
2549            path: None,
2550        };
2551        let result = execute(&cli).await.unwrap();
2552        assert!(result.contains("test output"));
2553    }
2554
2555    #[tokio::test]
2556    async fn test_execute_list_connection_refused() {
2557        let cli = Cli {
2558            node: "localhost:1".into(),
2559            token: None,
2560            command: Some(Commands::List { all: false }),
2561            path: None,
2562        };
2563        let result = execute(&cli).await;
2564        let err = result.unwrap_err().to_string();
2565        assert!(
2566            err.contains("Could not connect to pulpod"),
2567            "Expected friendly error, got: {err}"
2568        );
2569        assert!(err.contains("localhost:1"));
2570    }
2571
2572    #[tokio::test]
2573    async fn test_execute_nodes_connection_refused() {
2574        let cli = Cli {
2575            node: "localhost:1".into(),
2576            token: None,
2577            command: Some(Commands::Nodes),
2578            path: None,
2579        };
2580        let result = execute(&cli).await;
2581        let err = result.unwrap_err().to_string();
2582        assert!(err.contains("Could not connect to pulpod"));
2583    }
2584
2585    #[tokio::test]
2586    async fn test_execute_stop_error_response() {
2587        use axum::{Router, http::StatusCode, routing::post};
2588
2589        let app = Router::new().route(
2590            "/api/v1/sessions/{id}/stop",
2591            post(|| async {
2592                (
2593                    StatusCode::NOT_FOUND,
2594                    "{\"error\":\"session not found: test-session\"}",
2595                )
2596            }),
2597        );
2598        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2599        let addr = listener.local_addr().unwrap();
2600        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2601        let node = format!("127.0.0.1:{}", addr.port());
2602
2603        let cli = Cli {
2604            node,
2605            token: None,
2606            command: Some(Commands::Stop {
2607                names: vec!["test-session".into()],
2608                purge: false,
2609            }),
2610            path: None,
2611        };
2612        let result = execute(&cli).await.unwrap();
2613        assert!(result.contains("Error stopping test-session"), "{result}");
2614    }
2615
2616    #[tokio::test]
2617    async fn test_execute_logs_error_response() {
2618        use axum::{Router, http::StatusCode, routing::get};
2619
2620        let app = Router::new().route(
2621            "/api/v1/sessions/{id}/output",
2622            get(|| async {
2623                (
2624                    StatusCode::NOT_FOUND,
2625                    "{\"error\":\"session not found: ghost\"}",
2626                )
2627            }),
2628        );
2629        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2630        let addr = listener.local_addr().unwrap();
2631        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2632        let node = format!("127.0.0.1:{}", addr.port());
2633
2634        let cli = Cli {
2635            node,
2636            token: None,
2637            command: Some(Commands::Logs {
2638                name: "ghost".into(),
2639                lines: 50,
2640                follow: false,
2641            }),
2642            path: None,
2643        };
2644        let err = execute(&cli).await.unwrap_err();
2645        assert_eq!(err.to_string(), "session not found: ghost");
2646    }
2647
2648    #[tokio::test]
2649    async fn test_execute_resume_error_response() {
2650        use axum::{Router, http::StatusCode, routing::post};
2651
2652        let app = Router::new().route(
2653            "/api/v1/sessions/{id}/resume",
2654            post(|| async {
2655                (
2656                    StatusCode::BAD_REQUEST,
2657                    "{\"error\":\"session is not lost (status: active)\"}",
2658                )
2659            }),
2660        );
2661        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2662        let addr = listener.local_addr().unwrap();
2663        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2664        let node = format!("127.0.0.1:{}", addr.port());
2665
2666        let cli = Cli {
2667            node,
2668            token: None,
2669            command: Some(Commands::Resume {
2670                name: "test-session".into(),
2671            }),
2672            path: None,
2673        };
2674        let err = execute(&cli).await.unwrap_err();
2675        assert_eq!(err.to_string(), "session is not lost (status: active)");
2676    }
2677
2678    #[tokio::test]
2679    async fn test_execute_spawn_error_response() {
2680        use axum::{Router, http::StatusCode, routing::post};
2681
2682        let app = Router::new().route(
2683            "/api/v1/sessions",
2684            post(|| async {
2685                (
2686                    StatusCode::INTERNAL_SERVER_ERROR,
2687                    "{\"error\":\"failed to spawn session\"}",
2688                )
2689            }),
2690        );
2691        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2692        let addr = listener.local_addr().unwrap();
2693        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2694        let node = format!("127.0.0.1:{}", addr.port());
2695
2696        let cli = Cli {
2697            node,
2698            token: None,
2699            command: Some(Commands::Spawn {
2700                name: Some("test".into()),
2701                workdir: Some("/tmp/repo".into()),
2702                ink: None,
2703                description: None,
2704                detach: true,
2705                idle_threshold: None,
2706                auto: false,
2707                worktree: false,
2708                worktree_base: None,
2709                runtime: None,
2710                secret: vec![],
2711                command: vec!["test".into()],
2712            }),
2713            path: None,
2714        };
2715        let err = execute(&cli).await.unwrap_err();
2716        assert_eq!(err.to_string(), "failed to spawn session");
2717    }
2718
2719    #[tokio::test]
2720    async fn test_execute_interventions_error_response() {
2721        use axum::{Router, http::StatusCode, routing::get};
2722
2723        let app = Router::new().route(
2724            "/api/v1/sessions/{id}/interventions",
2725            get(|| async {
2726                (
2727                    StatusCode::NOT_FOUND,
2728                    "{\"error\":\"session not found: ghost\"}",
2729                )
2730            }),
2731        );
2732        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2733        let addr = listener.local_addr().unwrap();
2734        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2735        let node = format!("127.0.0.1:{}", addr.port());
2736
2737        let cli = Cli {
2738            node,
2739            token: None,
2740            command: Some(Commands::Interventions {
2741                name: "ghost".into(),
2742            }),
2743            path: None,
2744        };
2745        let err = execute(&cli).await.unwrap_err();
2746        assert_eq!(err.to_string(), "session not found: ghost");
2747    }
2748
2749    #[tokio::test]
2750    async fn test_execute_resume_success() {
2751        let node = start_test_server().await;
2752        let cli = Cli {
2753            node,
2754            token: None,
2755            command: Some(Commands::Resume {
2756                name: "test-session".into(),
2757            }),
2758            path: None,
2759        };
2760        let result = execute(&cli).await.unwrap();
2761        assert!(result.contains("Detached from session"));
2762    }
2763
2764    #[tokio::test]
2765    async fn test_execute_input_success() {
2766        let node = start_test_server().await;
2767        let cli = Cli {
2768            node,
2769            token: None,
2770            command: Some(Commands::Input {
2771                name: "test-session".into(),
2772                text: Some("yes".into()),
2773            }),
2774            path: None,
2775        };
2776        let result = execute(&cli).await.unwrap();
2777        assert!(result.contains("Sent input to session test-session"));
2778    }
2779
2780    #[tokio::test]
2781    async fn test_execute_input_no_text() {
2782        let node = start_test_server().await;
2783        let cli = Cli {
2784            node,
2785            token: None,
2786            command: Some(Commands::Input {
2787                name: "test-session".into(),
2788                text: None,
2789            }),
2790            path: None,
2791        };
2792        let result = execute(&cli).await.unwrap();
2793        assert!(result.contains("Sent input to session test-session"));
2794    }
2795
2796    #[tokio::test]
2797    async fn test_execute_input_connection_refused() {
2798        let cli = Cli {
2799            node: "localhost:1".into(),
2800            token: None,
2801            command: Some(Commands::Input {
2802                name: "test".into(),
2803                text: Some("y".into()),
2804            }),
2805            path: None,
2806        };
2807        let result = execute(&cli).await;
2808        let err = result.unwrap_err().to_string();
2809        assert!(err.contains("Could not connect to pulpod"));
2810    }
2811
2812    #[tokio::test]
2813    async fn test_execute_input_error_response() {
2814        use axum::{Router, http::StatusCode, routing::post};
2815
2816        let app = Router::new().route(
2817            "/api/v1/sessions/{id}/input",
2818            post(|| async {
2819                (
2820                    StatusCode::NOT_FOUND,
2821                    "{\"error\":\"session not found: ghost\"}",
2822                )
2823            }),
2824        );
2825        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2826        let addr = listener.local_addr().unwrap();
2827        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2828        let node = format!("127.0.0.1:{}", addr.port());
2829
2830        let cli = Cli {
2831            node,
2832            token: None,
2833            command: Some(Commands::Input {
2834                name: "ghost".into(),
2835                text: Some("y".into()),
2836            }),
2837            path: None,
2838        };
2839        let err = execute(&cli).await.unwrap_err();
2840        assert_eq!(err.to_string(), "session not found: ghost");
2841    }
2842
2843    #[tokio::test]
2844    async fn test_execute_ui() {
2845        let cli = Cli {
2846            node: "localhost:7433".into(),
2847            token: None,
2848            command: Some(Commands::Ui),
2849            path: None,
2850        };
2851        let result = execute(&cli).await.unwrap();
2852        assert!(result.contains("Opening"));
2853        assert!(result.contains("http://localhost:7433"));
2854    }
2855
2856    #[tokio::test]
2857    async fn test_execute_ui_custom_node() {
2858        let cli = Cli {
2859            node: "mac-mini:7433".into(),
2860            token: None,
2861            command: Some(Commands::Ui),
2862            path: None,
2863        };
2864        let result = execute(&cli).await.unwrap();
2865        assert!(result.contains("http://mac-mini:7433"));
2866    }
2867
2868    #[test]
2869    fn test_format_sessions_empty() {
2870        assert_eq!(format_sessions(&[]), "No sessions.");
2871    }
2872
2873    #[test]
2874    fn test_format_sessions_with_data() {
2875        use chrono::Utc;
2876        use pulpo_common::session::SessionStatus;
2877        use uuid::Uuid;
2878
2879        let sessions = vec![Session {
2880            id: Uuid::nil(),
2881            name: "my-api".into(),
2882            workdir: "/tmp/repo".into(),
2883            command: "claude -p 'Fix the bug'".into(),
2884            description: Some("Fix the bug".into()),
2885            status: SessionStatus::Active,
2886            exit_code: None,
2887            backend_session_id: None,
2888            output_snapshot: None,
2889            metadata: None,
2890            ink: None,
2891            intervention_code: None,
2892            intervention_reason: None,
2893            intervention_at: None,
2894            last_output_at: None,
2895            idle_since: None,
2896            idle_threshold_secs: None,
2897            worktree_path: None,
2898            worktree_branch: None,
2899            git_branch: None,
2900            git_commit: None,
2901            git_files_changed: None,
2902            git_insertions: None,
2903            git_deletions: None,
2904            git_ahead: None,
2905            runtime: Runtime::Tmux,
2906            created_at: Utc::now(),
2907            updated_at: Utc::now(),
2908        }];
2909        let output = format_sessions(&sessions);
2910        assert!(output.contains("ID"));
2911        assert!(output.contains("NAME"));
2912        assert!(output.contains("REPO"));
2913        assert!(output.contains("COMMAND"));
2914        assert!(output.contains("00000000"));
2915        assert!(output.contains("my-api"));
2916        assert!(output.contains("active"));
2917        assert!(output.contains("claude -p 'Fix the bug'"));
2918    }
2919
2920    #[test]
2921    fn test_format_repo_without_branch() {
2922        let s = repo_session("/home/user/test", None);
2923        assert_eq!(format_repo(&s), "test");
2924    }
2925
2926    #[test]
2927    fn test_format_repo_with_branch() {
2928        let s = repo_session("/home/user/pulpo", Some("main"));
2929        assert_eq!(format_repo(&s), "pulpo@main");
2930    }
2931
2932    #[test]
2933    fn test_format_repo_truncates_long() {
2934        let s = repo_session(
2935            "/home/user/my-very-long-repo",
2936            Some("feature/my-long-branch"),
2937        );
2938        let result = format_repo(&s);
2939        assert!(result.len() <= 30);
2940        assert!(result.ends_with("..."));
2941    }
2942
2943    #[test]
2944    fn test_format_repo_root_path() {
2945        let s = repo_session("/", None);
2946        assert_eq!(format_repo(&s), "");
2947    }
2948
2949    #[test]
2950    fn test_format_repo_with_diff_stats() {
2951        let mut s = repo_session("/home/user/pulpo", Some("main"));
2952        s.git_insertions = Some(42);
2953        s.git_deletions = Some(7);
2954        let result = format_repo(&s);
2955        assert!(result.contains("+42/-7"));
2956    }
2957
2958    #[test]
2959    fn test_format_repo_with_ahead() {
2960        let mut s = repo_session("/home/user/pulpo", Some("main"));
2961        s.git_ahead = Some(3);
2962        let result = format_repo(&s);
2963        assert!(result.contains("\u{2191}3"));
2964    }
2965
2966    #[test]
2967    fn test_format_repo_zero_diff_hidden() {
2968        let mut s = repo_session("/home/user/pulpo", None);
2969        s.git_insertions = Some(0);
2970        s.git_deletions = Some(0);
2971        let result = format_repo(&s);
2972        assert!(!result.contains("+0/-0"));
2973    }
2974
2975    #[test]
2976    fn test_format_repo_zero_ahead_hidden() {
2977        let mut s = repo_session("/home/user/pulpo", None);
2978        s.git_ahead = Some(0);
2979        let result = format_repo(&s);
2980        assert!(!result.contains('\u{2191}'));
2981    }
2982
2983    #[test]
2984    fn test_format_sessions_with_git_branch() {
2985        use chrono::Utc;
2986        use pulpo_common::session::SessionStatus;
2987        use uuid::Uuid;
2988
2989        let sessions = vec![Session {
2990            id: Uuid::nil(),
2991            name: "my-api".into(),
2992            workdir: "/tmp/repo".into(),
2993            command: "echo hello".into(),
2994            description: None,
2995            status: SessionStatus::Active,
2996            exit_code: None,
2997            backend_session_id: None,
2998            output_snapshot: None,
2999            metadata: None,
3000            ink: None,
3001            intervention_code: None,
3002            intervention_reason: None,
3003            intervention_at: None,
3004            last_output_at: None,
3005            idle_since: None,
3006            idle_threshold_secs: None,
3007            worktree_path: None,
3008            worktree_branch: None,
3009            git_branch: Some("main".into()),
3010            git_commit: Some("abc1234".into()),
3011            git_files_changed: None,
3012            git_insertions: None,
3013            git_deletions: None,
3014            git_ahead: None,
3015            runtime: Runtime::Tmux,
3016            created_at: Utc::now(),
3017            updated_at: Utc::now(),
3018        }];
3019        let output = format_sessions(&sessions);
3020        assert!(output.contains("repo@main"));
3021    }
3022
3023    #[test]
3024    fn test_format_sessions_with_error_status() {
3025        use chrono::Utc;
3026        use pulpo_common::session::SessionStatus;
3027        use uuid::Uuid;
3028
3029        let mut meta = std::collections::HashMap::new();
3030        meta.insert("error_status".into(), "Compile error".into());
3031        let sessions = vec![Session {
3032            id: Uuid::nil(),
3033            name: "my-api".into(),
3034            workdir: "/tmp/repo".into(),
3035            command: "echo hello".into(),
3036            description: None,
3037            status: SessionStatus::Active,
3038            exit_code: None,
3039            backend_session_id: None,
3040            output_snapshot: None,
3041            metadata: Some(meta),
3042            ink: None,
3043            intervention_code: None,
3044            intervention_reason: None,
3045            intervention_at: None,
3046            last_output_at: None,
3047            idle_since: None,
3048            idle_threshold_secs: None,
3049            worktree_path: None,
3050            worktree_branch: None,
3051            git_branch: None,
3052            git_commit: None,
3053            git_files_changed: None,
3054            git_insertions: None,
3055            git_deletions: None,
3056            git_ahead: None,
3057            runtime: Runtime::Tmux,
3058            created_at: Utc::now(),
3059            updated_at: Utc::now(),
3060        }];
3061        let output = format_sessions(&sessions);
3062        assert!(output.contains("[!]"));
3063    }
3064
3065    #[test]
3066    fn test_format_sessions_docker_runtime() {
3067        use chrono::Utc;
3068        use pulpo_common::session::SessionStatus;
3069        use uuid::Uuid;
3070
3071        let sessions = vec![Session {
3072            id: Uuid::nil(),
3073            name: "sandbox-test".into(),
3074            workdir: "/tmp".into(),
3075            command: "claude".into(),
3076            description: None,
3077            status: SessionStatus::Active,
3078            exit_code: None,
3079            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
3080            output_snapshot: None,
3081            metadata: None,
3082            ink: None,
3083            intervention_code: None,
3084            intervention_reason: None,
3085            intervention_at: None,
3086            last_output_at: None,
3087            idle_since: None,
3088            idle_threshold_secs: None,
3089            worktree_path: None,
3090            worktree_branch: None,
3091            git_branch: None,
3092            git_commit: None,
3093            git_files_changed: None,
3094            git_insertions: None,
3095            git_deletions: None,
3096            git_ahead: None,
3097            runtime: Runtime::Docker,
3098            created_at: Utc::now(),
3099            updated_at: Utc::now(),
3100        }];
3101        let output = format_sessions(&sessions);
3102        assert!(output.contains("tmp"));
3103    }
3104
3105    #[test]
3106    fn test_format_sessions_long_command_truncated() {
3107        use chrono::Utc;
3108        use pulpo_common::session::SessionStatus;
3109        use uuid::Uuid;
3110
3111        let sessions = vec![Session {
3112            id: Uuid::nil(),
3113            name: "test".into(),
3114            workdir: "/tmp".into(),
3115            command:
3116                "claude -p 'A very long command that exceeds fifty characters in total length here'"
3117                    .into(),
3118            description: None,
3119            status: SessionStatus::Ready,
3120            exit_code: None,
3121            backend_session_id: None,
3122            output_snapshot: None,
3123            metadata: None,
3124            ink: None,
3125            intervention_code: None,
3126            intervention_reason: None,
3127            intervention_at: None,
3128            last_output_at: None,
3129            idle_since: None,
3130            idle_threshold_secs: None,
3131            worktree_path: None,
3132            worktree_branch: None,
3133            git_branch: None,
3134            git_commit: None,
3135            git_files_changed: None,
3136            git_insertions: None,
3137            git_deletions: None,
3138            git_ahead: None,
3139            runtime: Runtime::Tmux,
3140            created_at: Utc::now(),
3141            updated_at: Utc::now(),
3142        }];
3143        let output = format_sessions(&sessions);
3144        assert!(output.contains("..."));
3145    }
3146
3147    #[test]
3148    fn test_format_sessions_worktree_indicator() {
3149        use chrono::Utc;
3150        use pulpo_common::session::SessionStatus;
3151        use uuid::Uuid;
3152
3153        let sessions = vec![Session {
3154            id: Uuid::nil(),
3155            name: "wt-task".into(),
3156            workdir: "/repo".into(),
3157            command: "claude".into(),
3158            description: None,
3159            status: SessionStatus::Active,
3160            exit_code: None,
3161            backend_session_id: None,
3162            output_snapshot: None,
3163            metadata: None,
3164            ink: None,
3165            intervention_code: None,
3166            intervention_reason: None,
3167            intervention_at: None,
3168            last_output_at: None,
3169            idle_since: None,
3170            idle_threshold_secs: None,
3171            worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
3172            worktree_branch: Some("wt-task".into()),
3173            git_branch: None,
3174            git_commit: None,
3175            git_files_changed: None,
3176            git_insertions: None,
3177            git_deletions: None,
3178            git_ahead: None,
3179            runtime: Runtime::Tmux,
3180            created_at: Utc::now(),
3181            updated_at: Utc::now(),
3182        }];
3183        let output = format_sessions(&sessions);
3184        assert!(
3185            output.contains("[wt]"),
3186            "should show worktree indicator: {output}"
3187        );
3188        assert!(output.contains("wt-task [wt]"));
3189    }
3190
3191    #[test]
3192    fn test_format_sessions_pr_indicator() {
3193        use chrono::Utc;
3194        use pulpo_common::session::SessionStatus;
3195        use std::collections::HashMap;
3196        use uuid::Uuid;
3197
3198        let mut meta = HashMap::new();
3199        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3200        let sessions = vec![Session {
3201            id: Uuid::nil(),
3202            name: "pr-task".into(),
3203            workdir: "/tmp".into(),
3204            command: "claude".into(),
3205            description: None,
3206            status: SessionStatus::Active,
3207            exit_code: None,
3208            backend_session_id: None,
3209            output_snapshot: None,
3210            metadata: Some(meta),
3211            ink: None,
3212            intervention_code: None,
3213            intervention_reason: None,
3214            intervention_at: None,
3215            last_output_at: None,
3216            idle_since: None,
3217            idle_threshold_secs: None,
3218            worktree_path: None,
3219            worktree_branch: None,
3220            git_branch: None,
3221            git_commit: None,
3222            git_files_changed: None,
3223            git_insertions: None,
3224            git_deletions: None,
3225            git_ahead: None,
3226            runtime: Runtime::Tmux,
3227            created_at: Utc::now(),
3228            updated_at: Utc::now(),
3229        }];
3230        let output = format_sessions(&sessions);
3231        assert!(
3232            output.contains("[PR]"),
3233            "should show PR indicator: {output}"
3234        );
3235        assert!(output.contains("pr-task [PR]"));
3236    }
3237
3238    #[test]
3239    fn test_format_sessions_worktree_and_pr_indicator() {
3240        use chrono::Utc;
3241        use pulpo_common::session::SessionStatus;
3242        use std::collections::HashMap;
3243        use uuid::Uuid;
3244
3245        let mut meta = HashMap::new();
3246        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3247        let sessions = vec![Session {
3248            id: Uuid::nil(),
3249            name: "both-task".into(),
3250            workdir: "/tmp".into(),
3251            command: "claude".into(),
3252            description: None,
3253            status: SessionStatus::Active,
3254            exit_code: None,
3255            backend_session_id: None,
3256            output_snapshot: None,
3257            metadata: Some(meta),
3258            ink: None,
3259            intervention_code: None,
3260            intervention_reason: None,
3261            intervention_at: None,
3262            last_output_at: None,
3263            idle_since: None,
3264            idle_threshold_secs: None,
3265            worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
3266            worktree_branch: Some("both-task".into()),
3267            git_branch: None,
3268            git_commit: None,
3269            git_files_changed: None,
3270            git_insertions: None,
3271            git_deletions: None,
3272            git_ahead: None,
3273            runtime: Runtime::Tmux,
3274            created_at: Utc::now(),
3275            updated_at: Utc::now(),
3276        }];
3277        let output = format_sessions(&sessions);
3278        assert!(
3279            output.contains("[wt] [PR]"),
3280            "should show both indicators: {output}"
3281        );
3282    }
3283
3284    #[test]
3285    fn test_format_sessions_no_pr_without_metadata() {
3286        use chrono::Utc;
3287        use pulpo_common::session::SessionStatus;
3288        use uuid::Uuid;
3289
3290        let sessions = vec![Session {
3291            id: Uuid::nil(),
3292            name: "no-pr".into(),
3293            workdir: "/tmp".into(),
3294            command: "claude".into(),
3295            description: None,
3296            status: SessionStatus::Active,
3297            exit_code: None,
3298            backend_session_id: None,
3299            output_snapshot: None,
3300            metadata: None,
3301            ink: None,
3302            intervention_code: None,
3303            intervention_reason: None,
3304            intervention_at: None,
3305            last_output_at: None,
3306            idle_since: None,
3307            idle_threshold_secs: None,
3308            worktree_path: None,
3309            worktree_branch: None,
3310            git_branch: None,
3311            git_commit: None,
3312            git_files_changed: None,
3313            git_insertions: None,
3314            git_deletions: None,
3315            git_ahead: None,
3316            runtime: Runtime::Tmux,
3317            created_at: Utc::now(),
3318            updated_at: Utc::now(),
3319        }];
3320        let output = format_sessions(&sessions);
3321        assert!(
3322            !output.contains("[PR]"),
3323            "should not show PR indicator: {output}"
3324        );
3325    }
3326
3327    #[test]
3328    fn test_format_nodes() {
3329        use pulpo_common::node::NodeInfo;
3330        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3331
3332        let resp = PeersResponse {
3333            local: NodeInfo {
3334                name: "mac-mini".into(),
3335                hostname: "h".into(),
3336                os: "macos".into(),
3337                arch: "arm64".into(),
3338                cpus: 8,
3339                memory_mb: 16384,
3340                gpu: None,
3341            },
3342            peers: vec![PeerInfo {
3343                name: "win-pc".into(),
3344                address: "win-pc:7433".into(),
3345                status: PeerStatus::Online,
3346                node_info: None,
3347                session_count: Some(3),
3348                source: PeerSource::Configured,
3349            }],
3350        };
3351        let output = format_nodes(&resp);
3352        assert!(output.contains("mac-mini"));
3353        assert!(output.contains("(local)"));
3354        assert!(output.contains("win-pc"));
3355        assert!(output.contains('3'));
3356    }
3357
3358    #[test]
3359    fn test_format_nodes_no_session_count() {
3360        use pulpo_common::node::NodeInfo;
3361        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3362
3363        let resp = PeersResponse {
3364            local: NodeInfo {
3365                name: "local".into(),
3366                hostname: "h".into(),
3367                os: "linux".into(),
3368                arch: "x86_64".into(),
3369                cpus: 4,
3370                memory_mb: 8192,
3371                gpu: None,
3372            },
3373            peers: vec![PeerInfo {
3374                name: "peer".into(),
3375                address: "peer:7433".into(),
3376                status: PeerStatus::Offline,
3377                node_info: None,
3378                session_count: None,
3379                source: PeerSource::Configured,
3380            }],
3381        };
3382        let output = format_nodes(&resp);
3383        assert!(output.contains("offline"));
3384        // No session count → shows "-"
3385        let lines: Vec<&str> = output.lines().collect();
3386        assert!(lines[2].contains('-'));
3387    }
3388
3389    #[tokio::test]
3390    async fn test_execute_resume_connection_refused() {
3391        let cli = Cli {
3392            node: "localhost:1".into(),
3393            token: None,
3394            command: Some(Commands::Resume {
3395                name: "test".into(),
3396            }),
3397            path: None,
3398        };
3399        let result = execute(&cli).await;
3400        let err = result.unwrap_err().to_string();
3401        assert!(err.contains("Could not connect to pulpod"));
3402    }
3403
3404    #[tokio::test]
3405    async fn test_execute_spawn_connection_refused() {
3406        let cli = Cli {
3407            node: "localhost:1".into(),
3408            token: None,
3409            command: Some(Commands::Spawn {
3410                name: Some("test".into()),
3411                workdir: Some("/tmp".into()),
3412                ink: None,
3413                description: None,
3414                detach: true,
3415                idle_threshold: None,
3416                auto: false,
3417                worktree: false,
3418                worktree_base: None,
3419                runtime: None,
3420                secret: vec![],
3421                command: vec!["test".into()],
3422            }),
3423            path: None,
3424        };
3425        let result = execute(&cli).await;
3426        let err = result.unwrap_err().to_string();
3427        assert!(err.contains("Could not connect to pulpod"));
3428    }
3429
3430    #[tokio::test]
3431    async fn test_execute_stop_connection_refused() {
3432        let cli = Cli {
3433            node: "localhost:1".into(),
3434            token: None,
3435            command: Some(Commands::Stop {
3436                names: vec!["test".into()],
3437                purge: false,
3438            }),
3439            path: None,
3440        };
3441        let result = execute(&cli).await;
3442        let err = result.unwrap_err().to_string();
3443        assert!(err.contains("Could not connect to pulpod"));
3444    }
3445
3446    #[tokio::test]
3447    async fn test_execute_logs_connection_refused() {
3448        let cli = Cli {
3449            node: "localhost:1".into(),
3450            token: None,
3451            command: Some(Commands::Logs {
3452                name: "test".into(),
3453                lines: 50,
3454                follow: false,
3455            }),
3456            path: None,
3457        };
3458        let result = execute(&cli).await;
3459        let err = result.unwrap_err().to_string();
3460        assert!(err.contains("Could not connect to pulpod"));
3461    }
3462
3463    #[tokio::test]
3464    async fn test_friendly_error_connect() {
3465        // Make a request to a closed port to get a connect error
3466        let err = reqwest::Client::new()
3467            .get("http://127.0.0.1:1")
3468            .send()
3469            .await
3470            .unwrap_err();
3471        let friendly = friendly_error(&err, "test-node:1");
3472        let msg = friendly.to_string();
3473        assert!(
3474            msg.contains("Could not connect"),
3475            "Expected connect message, got: {msg}"
3476        );
3477    }
3478
3479    #[tokio::test]
3480    async fn test_friendly_error_other() {
3481        // A request to an invalid URL creates a builder error, not a connect error
3482        let err = reqwest::Client::new()
3483            .get("http://[::invalid::url")
3484            .send()
3485            .await
3486            .unwrap_err();
3487        let friendly = friendly_error(&err, "bad-host");
3488        let msg = friendly.to_string();
3489        assert!(
3490            msg.contains("Network error"),
3491            "Expected network error message, got: {msg}"
3492        );
3493        assert!(msg.contains("bad-host"));
3494    }
3495
3496    // -- Auth helper tests --
3497
3498    #[test]
3499    fn test_is_localhost_variants() {
3500        assert!(is_localhost("localhost:7433"));
3501        assert!(is_localhost("127.0.0.1:7433"));
3502        assert!(is_localhost("[::1]:7433"));
3503        assert!(is_localhost("::1"));
3504        assert!(is_localhost("localhost"));
3505        assert!(!is_localhost("mac-mini:7433"));
3506        assert!(!is_localhost("192.168.1.100:7433"));
3507    }
3508
3509    #[test]
3510    fn test_authed_get_with_token() {
3511        let client = reqwest::Client::new();
3512        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3513            .build()
3514            .unwrap();
3515        let auth = req
3516            .headers()
3517            .get("authorization")
3518            .unwrap()
3519            .to_str()
3520            .unwrap();
3521        assert_eq!(auth, "Bearer tok");
3522    }
3523
3524    #[test]
3525    fn test_authed_get_without_token() {
3526        let client = reqwest::Client::new();
3527        let req = authed_get(&client, "http://h:1/api".into(), None)
3528            .build()
3529            .unwrap();
3530        assert!(req.headers().get("authorization").is_none());
3531    }
3532
3533    #[test]
3534    fn test_authed_post_with_token() {
3535        let client = reqwest::Client::new();
3536        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3537            .build()
3538            .unwrap();
3539        let auth = req
3540            .headers()
3541            .get("authorization")
3542            .unwrap()
3543            .to_str()
3544            .unwrap();
3545        assert_eq!(auth, "Bearer secret");
3546    }
3547
3548    #[test]
3549    fn test_authed_post_without_token() {
3550        let client = reqwest::Client::new();
3551        let req = authed_post(&client, "http://h:1/api".into(), None)
3552            .build()
3553            .unwrap();
3554        assert!(req.headers().get("authorization").is_none());
3555    }
3556
3557    #[test]
3558    fn test_authed_delete_with_token() {
3559        let client = reqwest::Client::new();
3560        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3561            .build()
3562            .unwrap();
3563        let auth = req
3564            .headers()
3565            .get("authorization")
3566            .unwrap()
3567            .to_str()
3568            .unwrap();
3569        assert_eq!(auth, "Bearer del-tok");
3570    }
3571
3572    #[test]
3573    fn test_authed_delete_without_token() {
3574        let client = reqwest::Client::new();
3575        let req = authed_delete(&client, "http://h:1/api".into(), None)
3576            .build()
3577            .unwrap();
3578        assert!(req.headers().get("authorization").is_none());
3579    }
3580
3581    #[tokio::test]
3582    async fn test_resolve_token_explicit() {
3583        let client = reqwest::Client::new();
3584        let token =
3585            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3586        assert_eq!(token, Some("my-tok".into()));
3587    }
3588
3589    #[tokio::test]
3590    async fn test_resolve_token_remote_no_explicit() {
3591        let client = reqwest::Client::new();
3592        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3593        assert_eq!(token, None);
3594    }
3595
3596    #[tokio::test]
3597    async fn test_resolve_token_localhost_auto_discover() {
3598        use axum::{Json, Router, routing::get};
3599
3600        let app = Router::new().route(
3601            "/api/v1/auth/token",
3602            get(|| async {
3603                Json(AuthTokenResponse {
3604                    token: "discovered".into(),
3605                })
3606            }),
3607        );
3608        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3609        let addr = listener.local_addr().unwrap();
3610        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3611
3612        let node = format!("localhost:{}", addr.port());
3613        let base = base_url(&node);
3614        let client = reqwest::Client::new();
3615        let token = resolve_token(&client, &base, &node, None).await;
3616        assert_eq!(token, Some("discovered".into()));
3617    }
3618
3619    #[tokio::test]
3620    async fn test_discover_token_empty_returns_none() {
3621        use axum::{Json, Router, routing::get};
3622
3623        let app = Router::new().route(
3624            "/api/v1/auth/token",
3625            get(|| async {
3626                Json(AuthTokenResponse {
3627                    token: String::new(),
3628                })
3629            }),
3630        );
3631        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3632        let addr = listener.local_addr().unwrap();
3633        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3634
3635        let base = format!("http://127.0.0.1:{}", addr.port());
3636        let client = reqwest::Client::new();
3637        assert_eq!(discover_token(&client, &base).await, None);
3638    }
3639
3640    #[tokio::test]
3641    async fn test_discover_token_unreachable_returns_none() {
3642        let client = reqwest::Client::new();
3643        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3644    }
3645
3646    #[test]
3647    fn test_cli_parse_with_token() {
3648        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3649        assert_eq!(cli.token, Some("my-secret".into()));
3650    }
3651
3652    #[test]
3653    fn test_cli_parse_without_token() {
3654        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3655        assert_eq!(cli.token, None);
3656    }
3657
3658    #[tokio::test]
3659    async fn test_execute_with_explicit_token_sends_header() {
3660        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3661
3662        let app = Router::new().route(
3663            "/api/v1/sessions",
3664            get(|req: Request| async move {
3665                let auth = req
3666                    .headers()
3667                    .get("authorization")
3668                    .and_then(|v| v.to_str().ok())
3669                    .unwrap_or("");
3670                assert_eq!(auth, "Bearer test-token");
3671                (StatusCode::OK, "[]".to_owned())
3672            }),
3673        );
3674        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3675        let addr = listener.local_addr().unwrap();
3676        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3677        let node = format!("127.0.0.1:{}", addr.port());
3678
3679        let cli = Cli {
3680            node,
3681            token: Some("test-token".into()),
3682            command: Some(Commands::List { all: false }),
3683            path: None,
3684        };
3685        let result = execute(&cli).await.unwrap();
3686        assert_eq!(result, "No sessions.");
3687    }
3688
3689    // -- Interventions tests --
3690
3691    #[test]
3692    fn test_cli_parse_interventions() {
3693        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3694        assert!(matches!(
3695            &cli.command,
3696            Some(Commands::Interventions { name }) if name == "my-session"
3697        ));
3698    }
3699
3700    #[test]
3701    fn test_format_interventions_empty() {
3702        assert_eq!(format_interventions(&[]), "No intervention events.");
3703    }
3704
3705    #[test]
3706    fn test_format_interventions_with_data() {
3707        let events = vec![
3708            InterventionEventResponse {
3709                id: 1,
3710                session_id: "sess-1".into(),
3711                code: None,
3712                reason: "Memory exceeded threshold".into(),
3713                created_at: "2026-01-01T00:00:00Z".into(),
3714            },
3715            InterventionEventResponse {
3716                id: 2,
3717                session_id: "sess-1".into(),
3718                code: None,
3719                reason: "Idle for 10 minutes".into(),
3720                created_at: "2026-01-02T00:00:00Z".into(),
3721            },
3722        ];
3723        let output = format_interventions(&events);
3724        assert!(output.contains("ID"));
3725        assert!(output.contains("TIMESTAMP"));
3726        assert!(output.contains("REASON"));
3727        assert!(output.contains("Memory exceeded threshold"));
3728        assert!(output.contains("Idle for 10 minutes"));
3729        assert!(output.contains("2026-01-01T00:00:00Z"));
3730    }
3731
3732    #[tokio::test]
3733    async fn test_execute_interventions_empty() {
3734        let node = start_test_server().await;
3735        let cli = Cli {
3736            node,
3737            token: None,
3738            command: Some(Commands::Interventions {
3739                name: "my-session".into(),
3740            }),
3741            path: None,
3742        };
3743        let result = execute(&cli).await.unwrap();
3744        assert_eq!(result, "No intervention events.");
3745    }
3746
3747    #[tokio::test]
3748    async fn test_execute_interventions_with_data() {
3749        use axum::{Router, routing::get};
3750
3751        let app = Router::new().route(
3752            "/api/v1/sessions/{id}/interventions",
3753            get(|| async {
3754                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3755                    .to_owned()
3756            }),
3757        );
3758        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3759        let addr = listener.local_addr().unwrap();
3760        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3761        let node = format!("127.0.0.1:{}", addr.port());
3762
3763        let cli = Cli {
3764            node,
3765            token: None,
3766            command: Some(Commands::Interventions {
3767                name: "test".into(),
3768            }),
3769            path: None,
3770        };
3771        let result = execute(&cli).await.unwrap();
3772        assert!(result.contains("OOM"));
3773        assert!(result.contains("2026-01-01T00:00:00Z"));
3774    }
3775
3776    #[tokio::test]
3777    async fn test_execute_interventions_connection_refused() {
3778        let cli = Cli {
3779            node: "localhost:1".into(),
3780            token: None,
3781            command: Some(Commands::Interventions {
3782                name: "test".into(),
3783            }),
3784            path: None,
3785        };
3786        let result = execute(&cli).await;
3787        let err = result.unwrap_err().to_string();
3788        assert!(err.contains("Could not connect to pulpod"));
3789    }
3790
3791    // -- Attach command tests --
3792
3793    #[test]
3794    fn test_build_attach_command_tmux() {
3795        let cmd = build_attach_command("my-session");
3796        assert_eq!(cmd.get_program(), "tmux");
3797        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3798        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3799    }
3800
3801    #[test]
3802    fn test_build_attach_command_docker() {
3803        let cmd = build_attach_command("docker:pulpo-my-task");
3804        assert_eq!(cmd.get_program(), "docker");
3805        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3806        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3807    }
3808
3809    #[test]
3810    fn test_cli_parse_attach() {
3811        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3812        assert!(matches!(
3813            &cli.command,
3814            Some(Commands::Attach { name }) if name == "my-session"
3815        ));
3816    }
3817
3818    #[test]
3819    fn test_cli_parse_attach_alias() {
3820        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3821        assert!(matches!(
3822            &cli.command,
3823            Some(Commands::Attach { name }) if name == "my-session"
3824        ));
3825    }
3826
3827    #[tokio::test]
3828    async fn test_execute_attach_success() {
3829        let node = start_test_server().await;
3830        let cli = Cli {
3831            node,
3832            token: None,
3833            command: Some(Commands::Attach {
3834                name: "test-session".into(),
3835            }),
3836            path: None,
3837        };
3838        let result = execute(&cli).await.unwrap();
3839        assert!(result.contains("Detached from session test-session"));
3840    }
3841
3842    #[tokio::test]
3843    async fn test_execute_attach_with_backend_session_id() {
3844        use axum::{Router, routing::get};
3845        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3846        let app = Router::new().route(
3847            "/api/v1/sessions/{id}",
3848            get(move || async move { session_json.to_owned() }),
3849        );
3850        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3851        let addr = listener.local_addr().unwrap();
3852        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3853
3854        let cli = Cli {
3855            node: format!("127.0.0.1:{}", addr.port()),
3856            token: None,
3857            command: Some(Commands::Attach {
3858                name: "my-session".into(),
3859            }),
3860            path: None,
3861        };
3862        let result = execute(&cli).await.unwrap();
3863        assert!(result.contains("Detached from session my-session"));
3864    }
3865
3866    #[tokio::test]
3867    async fn test_execute_attach_connection_refused() {
3868        let cli = Cli {
3869            node: "localhost:1".into(),
3870            token: None,
3871            command: Some(Commands::Attach {
3872                name: "test-session".into(),
3873            }),
3874            path: None,
3875        };
3876        let result = execute(&cli).await;
3877        let err = result.unwrap_err().to_string();
3878        assert!(err.contains("Could not connect to pulpod"));
3879    }
3880
3881    #[tokio::test]
3882    async fn test_execute_attach_error_response() {
3883        use axum::{Router, http::StatusCode, routing::get};
3884        let app = Router::new().route(
3885            "/api/v1/sessions/{id}",
3886            get(|| async {
3887                (
3888                    StatusCode::NOT_FOUND,
3889                    r#"{"error":"session not found"}"#.to_owned(),
3890                )
3891            }),
3892        );
3893        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3894        let addr = listener.local_addr().unwrap();
3895        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3896
3897        let cli = Cli {
3898            node: format!("127.0.0.1:{}", addr.port()),
3899            token: None,
3900            command: Some(Commands::Attach {
3901                name: "nonexistent".into(),
3902            }),
3903            path: None,
3904        };
3905        let result = execute(&cli).await;
3906        let err = result.unwrap_err().to_string();
3907        assert!(err.contains("session not found"));
3908    }
3909
3910    #[tokio::test]
3911    async fn test_execute_attach_stale_session() {
3912        use axum::{Router, routing::get};
3913        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3914        let app = Router::new().route(
3915            "/api/v1/sessions/{id}",
3916            get(move || async move { session_json.to_owned() }),
3917        );
3918        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3919        let addr = listener.local_addr().unwrap();
3920        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3921
3922        let cli = Cli {
3923            node: format!("127.0.0.1:{}", addr.port()),
3924            token: None,
3925            command: Some(Commands::Attach {
3926                name: "stale-sess".into(),
3927            }),
3928            path: None,
3929        };
3930        let result = execute(&cli).await;
3931        let err = result.unwrap_err().to_string();
3932        assert!(err.contains("lost"));
3933        assert!(err.contains("pulpo resume"));
3934    }
3935
3936    #[tokio::test]
3937    async fn test_execute_attach_dead_session() {
3938        use axum::{Router, routing::get};
3939        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3940        let app = Router::new().route(
3941            "/api/v1/sessions/{id}",
3942            get(move || async move { session_json.to_owned() }),
3943        );
3944        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3945        let addr = listener.local_addr().unwrap();
3946        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3947
3948        let cli = Cli {
3949            node: format!("127.0.0.1:{}", addr.port()),
3950            token: None,
3951            command: Some(Commands::Attach {
3952                name: "dead-sess".into(),
3953            }),
3954            path: None,
3955        };
3956        let result = execute(&cli).await;
3957        let err = result.unwrap_err().to_string();
3958        assert!(err.contains("stopped"));
3959        assert!(err.contains("cannot attach"));
3960    }
3961
3962    // -- Alias parse tests --
3963
3964    #[test]
3965    fn test_cli_parse_alias_spawn() {
3966        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3967        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3968    }
3969
3970    #[test]
3971    fn test_cli_parse_alias_list() {
3972        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3973        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3974    }
3975
3976    #[test]
3977    fn test_cli_parse_list_all() {
3978        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3979        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3980
3981        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3982        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3983    }
3984
3985    #[test]
3986    fn test_cli_parse_alias_logs() {
3987        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3988        assert!(matches!(
3989            &cli.command,
3990            Some(Commands::Logs { name, .. }) if name == "my-session"
3991        ));
3992    }
3993
3994    #[test]
3995    fn test_cli_parse_alias_stop() {
3996        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3997        assert!(matches!(
3998            &cli.command,
3999            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
4000        ));
4001    }
4002
4003    #[test]
4004    fn test_cli_parse_alias_resume() {
4005        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
4006        assert!(matches!(
4007            &cli.command,
4008            Some(Commands::Resume { name }) if name == "my-session"
4009        ));
4010    }
4011
4012    #[test]
4013    fn test_cli_parse_alias_nodes() {
4014        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
4015        assert!(matches!(&cli.command, Some(Commands::Nodes)));
4016    }
4017
4018    #[test]
4019    fn test_cli_parse_alias_interventions() {
4020        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
4021        assert!(matches!(
4022            &cli.command,
4023            Some(Commands::Interventions { name }) if name == "my-session"
4024        ));
4025    }
4026
4027    #[test]
4028    fn test_api_error_json() {
4029        let err = api_error("{\"error\":\"session not found: foo\"}");
4030        assert_eq!(err.to_string(), "session not found: foo");
4031    }
4032
4033    #[test]
4034    fn test_api_error_plain_text() {
4035        let err = api_error("plain text error");
4036        assert_eq!(err.to_string(), "plain text error");
4037    }
4038
4039    // -- diff_output tests --
4040
4041    #[test]
4042    fn test_diff_output_empty_prev() {
4043        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
4044    }
4045
4046    #[test]
4047    fn test_diff_output_identical() {
4048        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
4049    }
4050
4051    #[test]
4052    fn test_diff_output_new_lines_appended() {
4053        let prev = "line1\nline2";
4054        let new = "line1\nline2\nline3\nline4";
4055        assert_eq!(diff_output(prev, new), "line3\nline4");
4056    }
4057
4058    #[test]
4059    fn test_diff_output_scrolled_window() {
4060        // Window of 3 lines: old lines scroll off top, new appear at bottom
4061        let prev = "line1\nline2\nline3";
4062        let new = "line2\nline3\nline4";
4063        assert_eq!(diff_output(prev, new), "line4");
4064    }
4065
4066    #[test]
4067    fn test_diff_output_completely_different() {
4068        let prev = "aaa\nbbb";
4069        let new = "xxx\nyyy";
4070        assert_eq!(diff_output(prev, new), "xxx\nyyy");
4071    }
4072
4073    #[test]
4074    fn test_diff_output_last_line_matches_but_overlap_fails() {
4075        // Last line of prev appears in new but preceding lines don't match
4076        let prev = "aaa\ncommon";
4077        let new = "zzz\ncommon\nnew_line";
4078        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
4079        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
4080        // Falls through, no verified overlap, so returns everything
4081        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
4082    }
4083
4084    #[test]
4085    fn test_diff_output_new_empty() {
4086        assert_eq!(diff_output("line1", ""), "");
4087    }
4088
4089    // -- follow_logs tests --
4090
4091    /// Start a test server that simulates evolving output and session status transitions.
4092    /// Start a test server that simulates evolving output with agent exit marker.
4093    async fn start_follow_test_server() -> String {
4094        use axum::{Router, extract::Path, extract::Query, routing::get};
4095        use std::sync::Arc;
4096        use std::sync::atomic::{AtomicUsize, Ordering};
4097
4098        let call_count = Arc::new(AtomicUsize::new(0));
4099        let output_count = call_count.clone();
4100
4101        let app = Router::new()
4102            .route(
4103                "/api/v1/sessions/{id}/output",
4104                get(
4105                    move |_path: Path<String>,
4106                          _query: Query<std::collections::HashMap<String, String>>| {
4107                        let count = output_count.clone();
4108                        async move {
4109                            let n = count.fetch_add(1, Ordering::SeqCst);
4110                            let output = match n {
4111                                0 => "line1\nline2".to_owned(),
4112                                1 => "line1\nline2\nline3".to_owned(),
4113                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
4114                            };
4115                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
4116                        }
4117                    },
4118                ),
4119            )
4120            .route(
4121                "/api/v1/sessions/{id}",
4122                get(|_path: Path<String>| async {
4123                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4124                }),
4125            );
4126
4127        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4128        let addr = listener.local_addr().unwrap();
4129        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4130        format!("http://127.0.0.1:{}", addr.port())
4131    }
4132
4133    #[tokio::test]
4134    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
4135        let base = start_follow_test_server().await;
4136        let client = reqwest::Client::new();
4137        let mut buf = Vec::new();
4138
4139        follow_logs(&client, &base, "test", 100, None, &mut buf)
4140            .await
4141            .unwrap();
4142
4143        let output = String::from_utf8(buf).unwrap();
4144        // Should contain initial output + new lines + agent exit marker
4145        assert!(output.contains("line1"));
4146        assert!(output.contains("line2"));
4147        assert!(output.contains("line3"));
4148        assert!(output.contains("line4"));
4149        assert!(output.contains("[pulpo] Agent exited"));
4150    }
4151
4152    #[tokio::test]
4153    async fn test_execute_logs_follow_success() {
4154        let base = start_follow_test_server().await;
4155        // Extract host:port from http://127.0.0.1:PORT
4156        let node = base.strip_prefix("http://").unwrap().to_owned();
4157
4158        let cli = Cli {
4159            node,
4160            token: None,
4161            command: Some(Commands::Logs {
4162                name: "test".into(),
4163                lines: 100,
4164                follow: true,
4165            }),
4166            path: None,
4167        };
4168        // execute() with follow writes to stdout and returns empty string
4169        let result = execute(&cli).await.unwrap();
4170        assert_eq!(result, "");
4171    }
4172
4173    #[tokio::test]
4174    async fn test_execute_logs_follow_connection_refused() {
4175        let cli = Cli {
4176            node: "localhost:1".into(),
4177            token: None,
4178            command: Some(Commands::Logs {
4179                name: "test".into(),
4180                lines: 50,
4181                follow: true,
4182            }),
4183            path: None,
4184        };
4185        let result = execute(&cli).await;
4186        let err = result.unwrap_err().to_string();
4187        assert!(
4188            err.contains("Could not connect to pulpod"),
4189            "Expected friendly error, got: {err}"
4190        );
4191    }
4192
4193    #[tokio::test]
4194    async fn test_follow_logs_exits_on_dead() {
4195        use axum::{Router, extract::Path, extract::Query, routing::get};
4196
4197        let app = Router::new()
4198            .route(
4199                "/api/v1/sessions/{id}/output",
4200                get(
4201                    |_path: Path<String>,
4202                     _query: Query<std::collections::HashMap<String, String>>| async {
4203                        r#"{"output":"some output"}"#.to_owned()
4204                    },
4205                ),
4206            )
4207            .route(
4208                "/api/v1/sessions/{id}",
4209                get(|_path: Path<String>| async {
4210                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4211                }),
4212            );
4213
4214        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4215        let addr = listener.local_addr().unwrap();
4216        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4217        let base = format!("http://127.0.0.1:{}", addr.port());
4218
4219        let client = reqwest::Client::new();
4220        let mut buf = Vec::new();
4221        follow_logs(&client, &base, "test", 100, None, &mut buf)
4222            .await
4223            .unwrap();
4224
4225        let output = String::from_utf8(buf).unwrap();
4226        assert!(output.contains("some output"));
4227    }
4228
4229    #[tokio::test]
4230    async fn test_follow_logs_exits_on_stale() {
4231        use axum::{Router, extract::Path, extract::Query, routing::get};
4232
4233        let app = Router::new()
4234            .route(
4235                "/api/v1/sessions/{id}/output",
4236                get(
4237                    |_path: Path<String>,
4238                     _query: Query<std::collections::HashMap<String, String>>| async {
4239                        r#"{"output":"stale output"}"#.to_owned()
4240                    },
4241                ),
4242            )
4243            .route(
4244                "/api/v1/sessions/{id}",
4245                get(|_path: Path<String>| async {
4246                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4247                }),
4248            );
4249
4250        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4251        let addr = listener.local_addr().unwrap();
4252        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4253        let base = format!("http://127.0.0.1:{}", addr.port());
4254
4255        let client = reqwest::Client::new();
4256        let mut buf = Vec::new();
4257        follow_logs(&client, &base, "test", 100, None, &mut buf)
4258            .await
4259            .unwrap();
4260
4261        let output = String::from_utf8(buf).unwrap();
4262        assert!(output.contains("stale output"));
4263    }
4264
4265    #[tokio::test]
4266    async fn test_execute_logs_follow_non_reqwest_error() {
4267        use axum::{Router, extract::Path, extract::Query, routing::get};
4268
4269        // Session status endpoint returns invalid JSON to trigger a serde error
4270        let app = Router::new()
4271            .route(
4272                "/api/v1/sessions/{id}/output",
4273                get(
4274                    |_path: Path<String>,
4275                     _query: Query<std::collections::HashMap<String, String>>| async {
4276                        r#"{"output":"initial"}"#.to_owned()
4277                    },
4278                ),
4279            )
4280            .route(
4281                "/api/v1/sessions/{id}",
4282                get(|_path: Path<String>| async { "not valid json".to_owned() }),
4283            );
4284
4285        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4286        let addr = listener.local_addr().unwrap();
4287        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4288        let node = format!("127.0.0.1:{}", addr.port());
4289
4290        let cli = Cli {
4291            node,
4292            token: None,
4293            command: Some(Commands::Logs {
4294                name: "test".into(),
4295                lines: 100,
4296                follow: true,
4297            }),
4298            path: None,
4299        };
4300        let err = execute(&cli).await.unwrap_err();
4301        // serde_json error, not a reqwest error — hits the Err(other) branch
4302        let msg = err.to_string();
4303        assert!(
4304            msg.contains("expected ident"),
4305            "Expected serde parse error, got: {msg}"
4306        );
4307    }
4308
4309    #[tokio::test]
4310    async fn test_fetch_session_status_connection_error() {
4311        let client = reqwest::Client::new();
4312        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
4313        assert!(result.is_err());
4314    }
4315
4316    // -- Schedule tests --
4317
4318    #[test]
4319    fn test_format_schedules_empty() {
4320        assert_eq!(format_schedules(&[]), "No schedules.");
4321    }
4322
4323    #[test]
4324    fn test_format_schedules_with_entries() {
4325        let schedules = vec![serde_json::json!({
4326            "name": "nightly",
4327            "cron": "0 3 * * *",
4328            "enabled": true,
4329            "last_run_at": null,
4330            "target_node": null
4331        })];
4332        let output = format_schedules(&schedules);
4333        assert!(output.contains("nightly"));
4334        assert!(output.contains("0 3 * * *"));
4335        assert!(output.contains("local"));
4336        assert!(output.contains("yes"));
4337        assert!(output.contains('-'));
4338    }
4339
4340    #[test]
4341    fn test_format_schedules_disabled_entry() {
4342        let schedules = vec![serde_json::json!({
4343            "name": "weekly",
4344            "cron": "0 0 * * 0",
4345            "enabled": false,
4346            "last_run_at": "2026-03-18T03:00:00Z",
4347            "target_node": "gpu-box"
4348        })];
4349        let output = format_schedules(&schedules);
4350        assert!(output.contains("weekly"));
4351        assert!(output.contains("no"));
4352        assert!(output.contains("gpu-box"));
4353        assert!(output.contains("2026-03-18T03:00"));
4354    }
4355
4356    #[test]
4357    fn test_format_schedules_header() {
4358        let schedules = vec![serde_json::json!({
4359            "name": "test",
4360            "cron": "* * * * *",
4361            "enabled": true,
4362            "last_run_at": null,
4363            "target_node": null
4364        })];
4365        let output = format_schedules(&schedules);
4366        assert!(output.contains("NAME"));
4367        assert!(output.contains("CRON"));
4368        assert!(output.contains("ENABLED"));
4369        assert!(output.contains("LAST RUN"));
4370        assert!(output.contains("NODE"));
4371    }
4372
4373    // -- Schedule CLI parse tests --
4374
4375    #[test]
4376    fn test_cli_parse_schedule_add() {
4377        let cli = Cli::try_parse_from([
4378            "pulpo",
4379            "schedule",
4380            "add",
4381            "nightly",
4382            "0 3 * * *",
4383            "--workdir",
4384            "/repo",
4385            "--",
4386            "claude",
4387            "-p",
4388            "review",
4389        ])
4390        .unwrap();
4391        assert!(matches!(
4392            &cli.command,
4393            Some(Commands::Schedule {
4394                action: ScheduleAction::Add { name, cron, .. }
4395            }) if name == "nightly" && cron == "0 3 * * *"
4396        ));
4397    }
4398
4399    #[test]
4400    fn test_cli_parse_schedule_add_with_node() {
4401        let cli = Cli::try_parse_from([
4402            "pulpo",
4403            "schedule",
4404            "add",
4405            "nightly",
4406            "0 3 * * *",
4407            "--workdir",
4408            "/repo",
4409            "--node",
4410            "gpu-box",
4411            "--",
4412            "claude",
4413        ])
4414        .unwrap();
4415        assert!(matches!(
4416            &cli.command,
4417            Some(Commands::Schedule {
4418                action: ScheduleAction::Add { node, .. }
4419            }) if node.as_deref() == Some("gpu-box")
4420        ));
4421    }
4422
4423    #[test]
4424    fn test_cli_parse_schedule_add_install_alias() {
4425        let cli =
4426            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4427        assert!(matches!(
4428            &cli.command,
4429            Some(Commands::Schedule {
4430                action: ScheduleAction::Add { name, .. }
4431            }) if name == "nightly"
4432        ));
4433    }
4434
4435    #[test]
4436    fn test_cli_parse_schedule_list() {
4437        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4438        assert!(matches!(
4439            &cli.command,
4440            Some(Commands::Schedule {
4441                action: ScheduleAction::List
4442            })
4443        ));
4444    }
4445
4446    #[test]
4447    fn test_cli_parse_schedule_remove() {
4448        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4449        assert!(matches!(
4450            &cli.command,
4451            Some(Commands::Schedule {
4452                action: ScheduleAction::Remove { name }
4453            }) if name == "nightly"
4454        ));
4455    }
4456
4457    #[test]
4458    fn test_cli_parse_schedule_pause() {
4459        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4460        assert!(matches!(
4461            &cli.command,
4462            Some(Commands::Schedule {
4463                action: ScheduleAction::Pause { name }
4464            }) if name == "nightly"
4465        ));
4466    }
4467
4468    #[test]
4469    fn test_cli_parse_schedule_resume() {
4470        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4471        assert!(matches!(
4472            &cli.command,
4473            Some(Commands::Schedule {
4474                action: ScheduleAction::Resume { name }
4475            }) if name == "nightly"
4476        ));
4477    }
4478
4479    #[test]
4480    fn test_cli_parse_schedule_alias() {
4481        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4482        assert!(matches!(
4483            &cli.command,
4484            Some(Commands::Schedule {
4485                action: ScheduleAction::List
4486            })
4487        ));
4488    }
4489
4490    #[test]
4491    fn test_cli_parse_schedule_list_alias() {
4492        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4493        assert!(matches!(
4494            &cli.command,
4495            Some(Commands::Schedule {
4496                action: ScheduleAction::List
4497            })
4498        ));
4499    }
4500
4501    #[test]
4502    fn test_cli_parse_schedule_remove_alias() {
4503        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4504        assert!(matches!(
4505            &cli.command,
4506            Some(Commands::Schedule {
4507                action: ScheduleAction::Remove { name }
4508            }) if name == "nightly"
4509        ));
4510    }
4511
4512    #[tokio::test]
4513    async fn test_execute_schedule_list_via_execute() {
4514        let node = start_test_server().await;
4515        let cli = Cli {
4516            node,
4517            token: None,
4518            command: Some(Commands::Schedule {
4519                action: ScheduleAction::List,
4520            }),
4521            path: None,
4522        };
4523        let result = execute(&cli).await.unwrap();
4524        // Under coverage, execute_schedule is a stub that returns empty string
4525        #[cfg(coverage)]
4526        assert!(result.is_empty());
4527        #[cfg(not(coverage))]
4528        assert_eq!(result, "No schedules.");
4529    }
4530
4531    #[test]
4532    fn test_schedule_action_debug() {
4533        let action = ScheduleAction::List;
4534        assert_eq!(format!("{action:?}"), "List");
4535    }
4536
4537    #[test]
4538    fn test_cli_parse_send_alias() {
4539        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4540        assert!(matches!(
4541            &cli.command,
4542            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4543        ));
4544    }
4545
4546    #[test]
4547    fn test_cli_parse_spawn_no_name() {
4548        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4549        assert!(matches!(
4550            &cli.command,
4551            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4552        ));
4553    }
4554
4555    #[test]
4556    fn test_cli_parse_spawn_optional_name_with_command() {
4557        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4558        assert!(matches!(
4559            &cli.command,
4560            Some(Commands::Spawn { name, command, .. })
4561                if name.is_none() && command == &["echo", "hello"]
4562        ));
4563    }
4564
4565    #[test]
4566    fn test_cli_parse_path_shortcut() {
4567        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4568        assert!(cli.command.is_none());
4569        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4570    }
4571
4572    #[test]
4573    fn test_cli_parse_no_args() {
4574        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4575        assert!(cli.command.is_none());
4576        assert!(cli.path.is_none());
4577    }
4578
4579    #[test]
4580    fn test_derive_session_name_simple() {
4581        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4582    }
4583
4584    #[test]
4585    fn test_derive_session_name_with_special_chars() {
4586        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4587    }
4588
4589    #[test]
4590    fn test_derive_session_name_root() {
4591        assert_eq!(derive_session_name("/"), "session");
4592    }
4593
4594    #[test]
4595    fn test_derive_session_name_dots() {
4596        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4597    }
4598
4599    #[test]
4600    fn test_resolve_path_absolute() {
4601        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4602    }
4603
4604    #[test]
4605    fn test_resolve_path_relative() {
4606        let resolved = resolve_path("my-repo");
4607        assert!(resolved.ends_with("my-repo"));
4608        assert!(resolved.starts_with('/'));
4609    }
4610
4611    #[tokio::test]
4612    async fn test_execute_no_args_shows_help() {
4613        let node = start_test_server().await;
4614        let cli = Cli {
4615            node,
4616            token: None,
4617            path: None,
4618            command: None,
4619        };
4620        let result = execute(&cli).await.unwrap();
4621        assert!(
4622            result.is_empty(),
4623            "no-args should return empty string after printing help"
4624        );
4625    }
4626
4627    #[tokio::test]
4628    async fn test_execute_path_shortcut() {
4629        let node = start_test_server().await;
4630        let cli = Cli {
4631            node,
4632            token: None,
4633            path: Some("/tmp".into()),
4634            command: None,
4635        };
4636        let result = execute(&cli).await.unwrap();
4637        assert!(result.contains("Detached from session"));
4638    }
4639
4640    #[tokio::test]
4641    async fn test_deduplicate_session_name_no_conflict() {
4642        // Connection refused → falls through to "name not taken" path
4643        let base = "http://127.0.0.1:1";
4644        let client = reqwest::Client::new();
4645        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4646        assert_eq!(name, "fresh");
4647    }
4648
4649    #[tokio::test]
4650    async fn test_deduplicate_session_name_with_conflict() {
4651        use axum::{Router, routing::get};
4652        use std::sync::atomic::{AtomicU32, Ordering};
4653
4654        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4655        let counter = call_count.clone();
4656        let app = Router::new()
4657            .route(
4658                "/api/v1/sessions/{id}",
4659                get(move || {
4660                    let c = counter.clone();
4661                    async move {
4662                        let n = c.fetch_add(1, Ordering::SeqCst);
4663                        if n == 0 {
4664                            // First call (base name) → exists
4665                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4666                        } else {
4667                            // Suffixed name → not found
4668                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4669                        }
4670                    }
4671                }),
4672            )
4673            .route(
4674                "/api/v1/peers",
4675                get(|| async {
4676                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4677                }),
4678            );
4679        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4680        let addr = listener.local_addr().unwrap();
4681        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4682        let base = format!("http://127.0.0.1:{}", addr.port());
4683        let client = reqwest::Client::new();
4684        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4685        assert_eq!(name, "repo-2");
4686    }
4687
4688    // -- Node resolution tests --
4689
4690    #[test]
4691    fn test_node_needs_resolution() {
4692        assert!(!node_needs_resolution("localhost:7433"));
4693        assert!(!node_needs_resolution("mac-mini:7433"));
4694        assert!(!node_needs_resolution("10.0.0.1:7433"));
4695        assert!(!node_needs_resolution("[::1]:7433"));
4696        assert!(node_needs_resolution("mac-mini"));
4697        assert!(node_needs_resolution("linux-server"));
4698        assert!(node_needs_resolution("localhost"));
4699    }
4700
4701    #[tokio::test]
4702    async fn test_resolve_node_with_port() {
4703        let client = reqwest::Client::new();
4704        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4705        assert_eq!(addr, "mac-mini:7433");
4706        assert!(token.is_none());
4707    }
4708
4709    #[tokio::test]
4710    async fn test_resolve_node_fallback_appends_port() {
4711        // No local daemon running on localhost:7433, so peer lookup fails
4712        // and it falls back to appending :7433
4713        let client = reqwest::Client::new();
4714        let (addr, token) = resolve_node(&client, "unknown-host").await;
4715        assert_eq!(addr, "unknown-host:7433");
4716        assert!(token.is_none());
4717    }
4718
4719    #[cfg(not(coverage))]
4720    #[tokio::test]
4721    async fn test_resolve_node_finds_peer() {
4722        use axum::{Router, routing::get};
4723
4724        let app = Router::new()
4725            .route(
4726                "/api/v1/peers",
4727                get(|| async {
4728                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4729                }),
4730            )
4731            .route(
4732                "/api/v1/config",
4733                get(|| async {
4734                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4735                }),
4736            );
4737
4738        // Port 7433 may be in use; skip test if so
4739        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4740            return;
4741        };
4742        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4743
4744        let client = reqwest::Client::new();
4745        let (addr, token) = resolve_node(&client, "mac-mini").await;
4746        assert_eq!(addr, "10.0.0.5:7433");
4747        assert_eq!(token, Some("peer-secret".into()));
4748    }
4749
4750    #[cfg(not(coverage))]
4751    #[tokio::test]
4752    async fn test_resolve_node_peer_no_token() {
4753        use axum::{Router, routing::get};
4754
4755        let app = Router::new()
4756            .route(
4757                "/api/v1/peers",
4758                get(|| async {
4759                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4760                }),
4761            )
4762            .route(
4763                "/api/v1/config",
4764                get(|| async {
4765                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4766                }),
4767            );
4768
4769        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4770            return; // Port in use, skip
4771        };
4772        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4773
4774        let client = reqwest::Client::new();
4775        let (addr, token) = resolve_node(&client, "test-peer").await;
4776        assert_eq!(addr, "10.0.0.9:7433");
4777        assert!(token.is_none()); // Simple peer entry has no token
4778    }
4779
4780    #[tokio::test]
4781    async fn test_execute_with_peer_name_resolution() {
4782        // When node doesn't contain ':', resolve_node is called.
4783        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4784        // The connection to the fallback address will fail, giving us a connection error.
4785        let cli = Cli {
4786            node: "nonexistent-peer".into(),
4787            token: None,
4788            command: Some(Commands::List { all: false }),
4789            path: None,
4790        };
4791        let result = execute(&cli).await;
4792        // Should try to connect to nonexistent-peer:7433 and fail
4793        assert!(result.is_err());
4794    }
4795
4796    // -- Auto node selection tests --
4797
4798    #[test]
4799    fn test_cli_parse_spawn_auto() {
4800        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4801        assert!(matches!(
4802            &cli.command,
4803            Some(Commands::Spawn { auto, .. }) if *auto
4804        ));
4805    }
4806
4807    #[test]
4808    fn test_cli_parse_spawn_auto_default() {
4809        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4810        assert!(matches!(
4811            &cli.command,
4812            Some(Commands::Spawn { auto, .. }) if !auto
4813        ));
4814    }
4815
4816    #[tokio::test]
4817    async fn test_select_best_node_coverage_stub() {
4818        // Exercise the coverage stub (or real function in non-coverage builds)
4819        let client = reqwest::Client::new();
4820        // In coverage builds, the stub returns ("localhost:7433", "local")
4821        // In non-coverage builds, this fails because no server is running — that's OK
4822        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4823    }
4824
4825    #[cfg(not(coverage))]
4826    #[tokio::test]
4827    async fn test_select_best_node_picks_least_loaded() {
4828        use axum::{Router, routing::get};
4829
4830        let app = Router::new().route(
4831            "/api/v1/peers",
4832            get(|| async {
4833                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4834            }),
4835        );
4836        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4837        let addr = listener.local_addr().unwrap();
4838        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4839        let base = format!("http://127.0.0.1:{}", addr.port());
4840
4841        let client = reqwest::Client::new();
4842        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4843        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4844        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4845        // idle wins (lower score)
4846        assert_eq!(name, "idle");
4847        assert_eq!(addr, "idle:7433");
4848    }
4849
4850    #[cfg(not(coverage))]
4851    #[tokio::test]
4852    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4853        use axum::{Router, routing::get};
4854
4855        let app = Router::new().route(
4856            "/api/v1/peers",
4857            get(|| async {
4858                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4859            }),
4860        );
4861        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4862        let addr = listener.local_addr().unwrap();
4863        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4864        let base = format!("http://127.0.0.1:{}", addr.port());
4865
4866        let client = reqwest::Client::new();
4867        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4868        assert_eq!(name, "my-mac");
4869        assert_eq!(addr, "localhost:7433");
4870    }
4871
4872    #[cfg(not(coverage))]
4873    #[tokio::test]
4874    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4875        use axum::{Router, routing::get};
4876
4877        let app = Router::new().route(
4878            "/api/v1/peers",
4879            get(|| async {
4880                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4881            }),
4882        );
4883        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4884        let addr = listener.local_addr().unwrap();
4885        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4886        let base = format!("http://127.0.0.1:{}", addr.port());
4887
4888        let client = reqwest::Client::new();
4889        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4890        assert_eq!(name, "solo");
4891        assert_eq!(addr, "localhost:7433");
4892    }
4893
4894    #[cfg(not(coverage))]
4895    #[tokio::test]
4896    async fn test_execute_spawn_auto_selects_node() {
4897        use axum::{
4898            Router,
4899            http::StatusCode,
4900            routing::{get, post},
4901        };
4902
4903        let create_json = test_create_response_json();
4904
4905        // Bind early so we know the address to embed in the peers response
4906        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4907        let addr = listener.local_addr().unwrap();
4908        let node = format!("127.0.0.1:{}", addr.port());
4909        let peer_addr = node.clone();
4910
4911        let app = Router::new()
4912            .route(
4913                "/api/v1/peers",
4914                get(move || {
4915                    let peer_addr = peer_addr.clone();
4916                    async move {
4917                        format!(
4918                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4919                        )
4920                    }
4921                }),
4922            )
4923            .route(
4924                "/api/v1/sessions",
4925                post(move || async move {
4926                    (StatusCode::CREATED, create_json.clone())
4927                }),
4928            );
4929
4930        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4931
4932        let cli = Cli {
4933            node,
4934            token: None,
4935            command: Some(Commands::Spawn {
4936                name: Some("test".into()),
4937                workdir: Some("/tmp/repo".into()),
4938                ink: None,
4939                description: None,
4940                detach: true,
4941                idle_threshold: None,
4942                auto: true,
4943                worktree: false,
4944                worktree_base: None,
4945                runtime: None,
4946                secret: vec![],
4947                command: vec!["echo".into(), "hello".into()],
4948            }),
4949            path: None,
4950        };
4951        let result = execute(&cli).await.unwrap();
4952        assert!(result.contains("Created session"));
4953    }
4954
4955    #[cfg(not(coverage))]
4956    #[tokio::test]
4957    async fn test_select_best_node_peer_no_session_count() {
4958        use axum::{Router, routing::get};
4959
4960        let app = Router::new().route(
4961            "/api/v1/peers",
4962            get(|| async {
4963                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4964            }),
4965        );
4966        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4967        let addr = listener.local_addr().unwrap();
4968        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4969        let base = format!("http://127.0.0.1:{}", addr.port());
4970
4971        let client = reqwest::Client::new();
4972        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4973        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4974        assert_eq!(name, "fresh");
4975        assert_eq!(addr, "fresh:7433");
4976    }
4977
4978    // -- Secret CLI parse tests --
4979
4980    #[test]
4981    fn test_cli_parse_secret_set() {
4982        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4983        assert!(matches!(
4984            &cli.command,
4985            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4986                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4987        ));
4988    }
4989
4990    #[test]
4991    fn test_cli_parse_secret_list() {
4992        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4993        assert!(matches!(
4994            &cli.command,
4995            Some(Commands::Secret {
4996                action: SecretAction::List
4997            })
4998        ));
4999    }
5000
5001    #[test]
5002    fn test_cli_parse_secret_list_alias() {
5003        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
5004        assert!(matches!(
5005            &cli.command,
5006            Some(Commands::Secret {
5007                action: SecretAction::List
5008            })
5009        ));
5010    }
5011
5012    #[test]
5013    fn test_cli_parse_secret_delete() {
5014        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
5015        assert!(matches!(
5016            &cli.command,
5017            Some(Commands::Secret { action: SecretAction::Delete { name } })
5018                if name == "MY_TOKEN"
5019        ));
5020    }
5021
5022    #[test]
5023    fn test_cli_parse_secret_delete_alias() {
5024        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
5025        assert!(matches!(
5026            &cli.command,
5027            Some(Commands::Secret { action: SecretAction::Delete { name } })
5028                if name == "MY_TOKEN"
5029        ));
5030    }
5031
5032    #[test]
5033    fn test_cli_parse_secret_alias() {
5034        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
5035        assert!(matches!(
5036            &cli.command,
5037            Some(Commands::Secret {
5038                action: SecretAction::List
5039            })
5040        ));
5041    }
5042
5043    #[test]
5044    fn test_format_secrets_empty() {
5045        let secrets: Vec<serde_json::Value> = vec![];
5046        assert_eq!(format_secrets(&secrets), "No secrets configured.");
5047    }
5048
5049    #[test]
5050    fn test_format_secrets_with_entries() {
5051        let secrets = vec![
5052            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5053            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5054        ];
5055        let output = format_secrets(&secrets);
5056        assert!(output.contains("GITHUB_TOKEN"));
5057        assert!(output.contains("NPM_TOKEN"));
5058        assert!(output.contains("NAME"));
5059        assert!(output.contains("ENV"));
5060        assert!(output.contains("CREATED"));
5061    }
5062
5063    #[test]
5064    fn test_format_secrets_with_env() {
5065        let secrets = vec![
5066            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5067            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5068        ];
5069        let output = format_secrets(&secrets);
5070        assert!(output.contains("GH_WORK"));
5071        assert!(output.contains("GITHUB_TOKEN"));
5072        assert!(output.contains("NPM_TOKEN"));
5073    }
5074
5075    #[test]
5076    fn test_format_secrets_short_timestamp() {
5077        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
5078        let output = format_secrets(&secrets);
5079        assert!(output.contains("now"));
5080    }
5081
5082    #[test]
5083    fn test_format_schedules_short_last_run_at() {
5084        // Regression: last_run_at shorter than 16 chars must not panic
5085        let schedules = vec![serde_json::json!({
5086            "name": "test",
5087            "cron": "* * * * *",
5088            "enabled": true,
5089            "last_run_at": "short",
5090            "target_node": null
5091        })];
5092        let output = format_schedules(&schedules);
5093        assert!(output.contains("short"));
5094    }
5095
5096    #[test]
5097    fn test_format_sessions_multibyte_command_truncation() {
5098        use chrono::Utc;
5099        use pulpo_common::session::SessionStatus;
5100        use uuid::Uuid;
5101
5102        // Command with multi-byte chars exceeding 50 bytes; must not panic
5103        let sessions = vec![Session {
5104            id: Uuid::nil(),
5105            name: "test".into(),
5106            workdir: "/tmp".into(),
5107            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
5108            description: None,
5109            status: SessionStatus::Active,
5110            exit_code: None,
5111            backend_session_id: None,
5112            output_snapshot: None,
5113            metadata: None,
5114            ink: None,
5115            intervention_code: None,
5116            intervention_reason: None,
5117            intervention_at: None,
5118            last_output_at: None,
5119            idle_since: None,
5120            idle_threshold_secs: None,
5121            worktree_path: None,
5122            worktree_branch: None,
5123            git_branch: None,
5124            git_commit: None,
5125            git_files_changed: None,
5126            git_insertions: None,
5127            git_deletions: None,
5128            git_ahead: None,
5129            runtime: Runtime::Tmux,
5130            created_at: Utc::now(),
5131            updated_at: Utc::now(),
5132        }];
5133        let output = format_sessions(&sessions);
5134        assert!(output.contains("..."));
5135    }
5136}