Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version = env!("PULPO_VERSION"),
15    args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18    /// Target node (default: localhost)
19    #[arg(long, default_value = "localhost:7433")]
20    pub node: String,
21
22    /// Auth token (auto-discovered from local daemon if omitted)
23    #[arg(long)]
24    pub token: Option<String>,
25
26    #[command(subcommand)]
27    pub command: Option<Commands>,
28
29    /// Quick spawn: `pulpo <path>` spawns a session in that directory
30    #[arg(value_name = "PATH")]
31    pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37    /// Attach to a session's terminal
38    #[command(visible_alias = "a")]
39    Attach {
40        /// Session name or ID
41        name: String,
42    },
43
44    /// Send input to a session
45    #[command(visible_alias = "i", visible_alias = "send")]
46    Input {
47        /// Session name or ID
48        name: String,
49        /// Text to send (sends Enter if omitted)
50        text: Option<String>,
51    },
52
53    /// Spawn a new agent session
54    #[command(visible_alias = "s")]
55    Spawn {
56        /// Session name (auto-generated from workdir if omitted)
57        name: Option<String>,
58
59        /// Working directory (defaults to current directory)
60        #[arg(long)]
61        workdir: Option<String>,
62
63        /// Ink name (from config)
64        #[arg(long)]
65        ink: Option<String>,
66
67        /// Human-readable description of the task
68        #[arg(long)]
69        description: Option<String>,
70
71        /// Don't attach to the session after spawning
72        #[arg(short, long)]
73        detach: bool,
74
75        /// Idle threshold in seconds (0 = never idle)
76        #[arg(long)]
77        idle_threshold: Option<u32>,
78
79        /// Auto-select the least loaded node
80        #[arg(long)]
81        auto: bool,
82
83        /// Create an isolated git worktree for the session
84        #[arg(long)]
85        worktree: bool,
86
87        /// Base branch to fork the worktree from (implies --worktree)
88        #[arg(long = "worktree-base")]
89        worktree_base: Option<String>,
90
91        /// Runtime environment: tmux (default) or docker
92        #[arg(long)]
93        runtime: Option<String>,
94
95        /// Secrets to inject as environment variables (by name)
96        #[arg(long)]
97        secret: Vec<String>,
98
99        /// Command to run (everything after --)
100        #[arg(last = true)]
101        command: Vec<String>,
102    },
103
104    /// List sessions (live only by default)
105    #[command(visible_alias = "ls")]
106    List {
107        /// Show all sessions including stopped and lost
108        #[arg(short, long)]
109        all: bool,
110    },
111
112    /// Show session logs/output
113    #[command(visible_alias = "l")]
114    Logs {
115        /// Session name or ID
116        name: String,
117
118        /// Number of lines to fetch
119        #[arg(long, default_value = "100")]
120        lines: usize,
121
122        /// Follow output (like `tail -f`)
123        #[arg(short, long)]
124        follow: bool,
125    },
126
127    /// Stop one or more sessions
128    #[command(visible_alias = "k", alias = "kill")]
129    Stop {
130        /// Session names or IDs
131        #[arg(required = true)]
132        names: Vec<String>,
133
134        /// Also purge the session from history
135        #[arg(long, short = 'p')]
136        purge: bool,
137    },
138
139    /// Remove all stopped and lost sessions
140    Cleanup,
141
142    /// Resume a lost session
143    #[command(visible_alias = "r")]
144    Resume {
145        /// Session name or ID
146        name: String,
147    },
148
149    /// List all known nodes
150    #[command(visible_alias = "n")]
151    Nodes,
152
153    /// Show intervention history for a session
154    #[command(visible_alias = "iv")]
155    Interventions {
156        /// Session name or ID
157        name: String,
158    },
159
160    /// Open the web dashboard in your browser
161    Ui,
162
163    /// Manage scheduled agent runs
164    #[command(visible_alias = "sched")]
165    Schedule {
166        #[command(subcommand)]
167        action: ScheduleAction,
168    },
169
170    /// Manage secrets (environment variables injected into sessions)
171    #[command(visible_alias = "sec")]
172    Secret {
173        #[command(subcommand)]
174        action: SecretAction,
175    },
176
177    /// Manage git worktrees for sessions
178    #[command(visible_alias = "wt")]
179    Worktree {
180        #[command(subcommand)]
181        action: WorktreeAction,
182    },
183}
184
185#[derive(Subcommand, Debug)]
186pub enum SecretAction {
187    /// Set a secret
188    Set {
189        /// Secret name (will be the env var name, uppercase + underscores)
190        name: String,
191        /// Secret value
192        value: String,
193        /// Environment variable name (defaults to secret name)
194        #[arg(long)]
195        env: Option<String>,
196    },
197    /// List secret names
198    #[command(visible_alias = "ls")]
199    List,
200    /// Delete a secret
201    #[command(visible_alias = "rm")]
202    Delete {
203        /// Secret name
204        name: String,
205    },
206}
207
208#[derive(Subcommand, Debug)]
209pub enum WorktreeAction {
210    /// List sessions that use git worktrees
211    #[command(visible_alias = "ls")]
212    List,
213}
214
215#[derive(Subcommand, Debug)]
216pub enum ScheduleAction {
217    /// Add a new schedule
218    #[command(alias = "install")]
219    Add {
220        /// Schedule name
221        name: String,
222        /// Cron expression (e.g. "0 3 * * *")
223        cron: String,
224        /// Working directory
225        #[arg(long)]
226        workdir: Option<String>,
227        /// Target node (omit = local, "auto" = least-loaded)
228        #[arg(long)]
229        node: Option<String>,
230        /// Ink preset
231        #[arg(long)]
232        ink: Option<String>,
233        /// Description
234        #[arg(long)]
235        description: Option<String>,
236        /// Command to run (everything after --)
237        #[arg(last = true)]
238        command: Vec<String>,
239    },
240    /// List all schedules
241    #[command(alias = "ls")]
242    List,
243    /// Remove a schedule
244    #[command(alias = "rm")]
245    Remove {
246        /// Schedule name or ID
247        name: String,
248    },
249    /// Pause a schedule
250    Pause {
251        /// Schedule name or ID
252        name: String,
253    },
254    /// Resume a paused schedule
255    Resume {
256        /// Schedule name or ID
257        name: String,
258    },
259}
260
261/// The marker emitted by the agent wrapper when the agent process exits.
262const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
263
264/// Resolve a path to an absolute path string.
265fn resolve_path(path: &str) -> String {
266    let p = std::path::Path::new(path);
267    if p.is_absolute() {
268        path.to_owned()
269    } else {
270        std::env::current_dir().map_or_else(
271            |_| path.to_owned(),
272            |cwd| cwd.join(p).to_string_lossy().into_owned(),
273        )
274    }
275}
276
277/// Derive a session name from a directory path (basename, kebab-cased).
278fn derive_session_name(path: &str) -> String {
279    let basename = std::path::Path::new(path)
280        .file_name()
281        .and_then(|n| n.to_str())
282        .unwrap_or("session");
283    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
284    let kebab: String = basename
285        .chars()
286        .map(|c| {
287            if c.is_ascii_alphanumeric() {
288                c.to_ascii_lowercase()
289            } else {
290                '-'
291            }
292        })
293        .collect();
294    // Collapse consecutive hyphens and trim leading/trailing hyphens
295    let mut result = String::new();
296    for c in kebab.chars() {
297        if c == '-' && result.ends_with('-') {
298            continue;
299        }
300        result.push(c);
301    }
302    let result = result.trim_matches('-').to_owned();
303    if result.is_empty() {
304        "session".to_owned()
305    } else {
306        result
307    }
308}
309
310/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
311async fn deduplicate_session_name(
312    client: &reqwest::Client,
313    base: &str,
314    name: &str,
315    token: Option<&str>,
316) -> String {
317    // Check if the name is already taken by fetching the session
318    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
319        .send()
320        .await;
321    match resp {
322        Ok(r) if r.status().is_success() => {
323            // Session exists — try suffixed names
324            for i in 2..=99 {
325                let candidate = format!("{name}-{i}");
326                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
327                    .send()
328                    .await;
329                match resp {
330                    Ok(r) if r.status().is_success() => {}
331                    _ => return candidate,
332                }
333            }
334            format!("{name}-100")
335        }
336        _ => name.to_owned(),
337    }
338}
339
340/// Format the base URL from the node address.
341pub fn base_url(node: &str) -> String {
342    format!("http://{node}")
343}
344
345/// Response shape for the output endpoint.
346#[derive(serde::Deserialize)]
347struct OutputResponse {
348    output: String,
349}
350
351/// Format a list of sessions as a table.
352const fn session_runtime(session: &Session) -> &'static str {
353    match session.runtime {
354        Runtime::Tmux => "tmux",
355        Runtime::Docker => "docker",
356    }
357}
358
359fn format_sessions(sessions: &[Session]) -> String {
360    if sessions.is_empty() {
361        return "No sessions.".into();
362    }
363    let mut lines = vec![format!(
364        "{:<10} {:<24} {:<12} {:<8} {}",
365        "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
366    )];
367    for s in sessions {
368        let cmd_display = if s.command.len() > 50 {
369            let truncated: String = s.command.chars().take(47).collect();
370            format!("{truncated}...")
371        } else {
372            s.command.clone()
373        };
374        let short_id = &s.id.to_string()[..8];
375        let has_pr = s
376            .metadata
377            .as_ref()
378            .is_some_and(|m| m.contains_key("pr_url"));
379        let name_display = match (s.worktree_path.is_some(), has_pr) {
380            (true, true) => format!("{} [wt] [PR]", s.name),
381            (true, false) => format!("{} [wt]", s.name),
382            (false, true) => format!("{} [PR]", s.name),
383            (false, false) => s.name.clone(),
384        };
385        lines.push(format!(
386            "{:<10} {:<24} {:<12} {:<8} {}",
387            short_id,
388            name_display,
389            s.status,
390            session_runtime(s),
391            cmd_display
392        ));
393    }
394    lines.join("\n")
395}
396
397/// Format the peers response as a table.
398fn format_nodes(resp: &PeersResponse) -> String {
399    let mut lines = vec![format!(
400        "{:<20} {:<25} {:<10} {}",
401        "NAME", "ADDRESS", "STATUS", "SESSIONS"
402    )];
403    lines.push(format!(
404        "{:<20} {:<25} {:<10} {}",
405        resp.local.name, "(local)", "online", "-"
406    ));
407    for p in &resp.peers {
408        let sessions = p
409            .session_count
410            .map_or_else(|| "-".into(), |c| c.to_string());
411        lines.push(format!(
412            "{:<20} {:<25} {:<10} {}",
413            p.name, p.address, p.status, sessions
414        ));
415    }
416    lines.join("\n")
417}
418
419/// Format intervention events as a table.
420fn format_interventions(events: &[InterventionEventResponse]) -> String {
421    if events.is_empty() {
422        return "No intervention events.".into();
423    }
424    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
425    for e in events {
426        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
427    }
428    lines.join("\n")
429}
430
431/// Build the command to open a URL in the default browser.
432#[cfg_attr(coverage, allow(dead_code))]
433fn build_open_command(url: &str) -> std::process::Command {
434    #[cfg(target_os = "macos")]
435    {
436        let mut cmd = std::process::Command::new("open");
437        cmd.arg(url);
438        cmd
439    }
440    #[cfg(target_os = "linux")]
441    {
442        let mut cmd = std::process::Command::new("xdg-open");
443        cmd.arg(url);
444        cmd
445    }
446    #[cfg(target_os = "windows")]
447    {
448        let mut cmd = std::process::Command::new("cmd");
449        cmd.args(["/C", "start", url]);
450        cmd
451    }
452    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
453    {
454        // Fallback: try xdg-open
455        let mut cmd = std::process::Command::new("xdg-open");
456        cmd.arg(url);
457        cmd
458    }
459}
460
461/// Open a URL in the default browser.
462#[cfg(not(coverage))]
463fn open_browser(url: &str) -> Result<()> {
464    build_open_command(url).status()?;
465    Ok(())
466}
467
468/// Stub for coverage builds — avoids opening a browser during tests.
469#[cfg(coverage)]
470fn open_browser(_url: &str) -> Result<()> {
471    Ok(())
472}
473
474/// Build the command to attach to a session's terminal.
475/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
476#[cfg_attr(coverage, allow(dead_code))]
477fn build_attach_command(backend_session_id: &str) -> std::process::Command {
478    // Docker sessions: exec into the container
479    if let Some(container) = backend_session_id.strip_prefix("docker:") {
480        let mut cmd = std::process::Command::new("docker");
481        cmd.args(["exec", "-it", container, "/bin/sh"]);
482        return cmd;
483    }
484    // tmux sessions
485    #[cfg(not(target_os = "windows"))]
486    {
487        let mut cmd = std::process::Command::new("tmux");
488        cmd.args(["attach-session", "-t", backend_session_id]);
489        cmd
490    }
491    #[cfg(target_os = "windows")]
492    {
493        // tmux attach not available on Windows — inform the user
494        let mut cmd = std::process::Command::new("cmd");
495        cmd.args([
496            "/C",
497            "echo",
498            "Attach not available on Windows. Use the web UI or --runtime docker.",
499        ]);
500        cmd
501    }
502}
503
504/// Attach to a session's terminal.
505#[cfg(not(any(test, coverage, target_os = "windows")))]
506fn attach_session(backend_session_id: &str) -> Result<()> {
507    let status = build_attach_command(backend_session_id).status()?;
508    if !status.success() {
509        anyhow::bail!("attach failed with {status}");
510    }
511    Ok(())
512}
513
514/// Stub for Windows — tmux attach is not available.
515#[cfg(all(target_os = "windows", not(test), not(coverage)))]
516fn attach_session(_backend_session_id: &str) -> Result<()> {
517    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
518    Ok(())
519}
520
521/// Stub for test and coverage builds — avoids spawning real terminals during tests.
522#[cfg(any(test, coverage))]
523#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
524fn attach_session(_backend_session_id: &str) -> Result<()> {
525    Ok(())
526}
527
528/// Extract a clean error message from an API JSON response (or fall back to raw text).
529fn api_error(text: &str) -> anyhow::Error {
530    serde_json::from_str::<serde_json::Value>(text)
531        .ok()
532        .and_then(|v| v["error"].as_str().map(String::from))
533        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
534}
535
536/// Return the response body text, or a clean error if the response was non-success.
537async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
538    if resp.status().is_success() {
539        Ok(resp.text().await?)
540    } else {
541        let text = resp.text().await?;
542        Err(api_error(&text))
543    }
544}
545
546/// Map a reqwest error to a user-friendly message.
547fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
548    if err.is_connect() {
549        anyhow::anyhow!(
550            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
551        )
552    } else {
553        anyhow::anyhow!("Network error connecting to {node}: {err}")
554    }
555}
556
557/// Check if the node address points to localhost.
558fn is_localhost(node: &str) -> bool {
559    let host = node.split(':').next().unwrap_or(node);
560    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
561}
562
563/// Try to auto-discover the auth token from a local daemon.
564async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
565    let resp = client
566        .get(format!("{base}/api/v1/auth/token"))
567        .send()
568        .await
569        .ok()?;
570    let body: AuthTokenResponse = resp.json().await.ok()?;
571    if body.token.is_empty() {
572        None
573    } else {
574        Some(body.token)
575    }
576}
577
578/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
579async fn resolve_token(
580    client: &reqwest::Client,
581    base: &str,
582    node: &str,
583    explicit: Option<&str>,
584) -> Option<String> {
585    if let Some(t) = explicit {
586        return Some(t.to_owned());
587    }
588    if is_localhost(node) {
589        return discover_token(client, base).await;
590    }
591    None
592}
593
594/// Check if a node string needs resolution (no port specified).
595fn node_needs_resolution(node: &str) -> bool {
596    !node.contains(':')
597}
598
599/// Resolve a node reference to a `host:port` address.
600///
601/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
602/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
603/// online peer is found, return its address and optionally its configured auth token
604/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
605#[cfg(not(coverage))]
606async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
607    // Already has port — use as-is
608    if !node_needs_resolution(node) {
609        return (node.to_owned(), None);
610    }
611
612    // Try to resolve via local daemon's peer registry
613    let local_base = "http://localhost:7433";
614    let mut resolved_address: Option<String> = None;
615
616    if let Ok(resp) = client
617        .get(format!("{local_base}/api/v1/peers"))
618        .send()
619        .await
620        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
621    {
622        for peer in &peers_resp.peers {
623            if peer.name == node {
624                resolved_address = Some(peer.address.clone());
625                break;
626            }
627        }
628    }
629
630    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
631
632    // Try to get the peer's auth token from the config endpoint
633    let peer_token = if let Ok(resp) = client
634        .get(format!("{local_base}/api/v1/config"))
635        .send()
636        .await
637        && let Ok(config) = resp.json::<ConfigResponse>().await
638        && let Some(entry) = config.peers.get(node)
639    {
640        entry.token().map(String::from)
641    } else {
642        None
643    };
644
645    (address, peer_token)
646}
647
648/// Coverage stub — no real HTTP resolution during coverage builds.
649#[cfg(coverage)]
650async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
651    if node_needs_resolution(node) {
652        (format!("{node}:7433"), None)
653    } else {
654        (node.to_owned(), None)
655    }
656}
657
658/// Build an authenticated GET request.
659fn authed_get(
660    client: &reqwest::Client,
661    url: String,
662    token: Option<&str>,
663) -> reqwest::RequestBuilder {
664    let req = client.get(url);
665    if let Some(t) = token {
666        req.bearer_auth(t)
667    } else {
668        req
669    }
670}
671
672/// Build an authenticated POST request.
673fn authed_post(
674    client: &reqwest::Client,
675    url: String,
676    token: Option<&str>,
677) -> reqwest::RequestBuilder {
678    let req = client.post(url);
679    if let Some(t) = token {
680        req.bearer_auth(t)
681    } else {
682        req
683    }
684}
685
686/// Build an authenticated DELETE request.
687fn authed_delete(
688    client: &reqwest::Client,
689    url: String,
690    token: Option<&str>,
691) -> reqwest::RequestBuilder {
692    let req = client.delete(url);
693    if let Some(t) = token {
694        req.bearer_auth(t)
695    } else {
696        req
697    }
698}
699
700/// Build an authenticated PUT request.
701#[cfg(not(coverage))]
702fn authed_put(
703    client: &reqwest::Client,
704    url: String,
705    token: Option<&str>,
706) -> reqwest::RequestBuilder {
707    let req = client.put(url);
708    if let Some(t) = token {
709        req.bearer_auth(t)
710    } else {
711        req
712    }
713}
714
715/// Fetch session output from the API.
716async fn fetch_output(
717    client: &reqwest::Client,
718    base: &str,
719    name: &str,
720    lines: usize,
721    token: Option<&str>,
722) -> Result<String> {
723    let resp = authed_get(
724        client,
725        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
726        token,
727    )
728    .send()
729    .await?;
730    let text = ok_or_api_error(resp).await?;
731    let output: OutputResponse = serde_json::from_str(&text)?;
732    Ok(output.output)
733}
734
735/// Fetch session status from the API.
736async fn fetch_session_status(
737    client: &reqwest::Client,
738    base: &str,
739    name: &str,
740    token: Option<&str>,
741) -> Result<String> {
742    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
743        .send()
744        .await?;
745    let text = ok_or_api_error(resp).await?;
746    let session: Session = serde_json::from_str(&text)?;
747    Ok(session.status.to_string())
748}
749
750/// Wait for the session to leave "creating" state, then check if it died instantly.
751/// Uses the session ID (not name) to avoid matching old stopped sessions with the same name.
752/// Returns an error with a helpful message if the session is lost/stopped.
753async fn check_session_alive(
754    client: &reqwest::Client,
755    base: &str,
756    session_id: &str,
757    token: Option<&str>,
758) -> Result<()> {
759    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
760    for _ in 0..3 {
761        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
762        // Fetch by ID to avoid name collisions with old sessions
763        let resp = authed_get(
764            client,
765            format!("{base}/api/v1/sessions/{session_id}"),
766            token,
767        )
768        .send()
769        .await;
770        if let Ok(resp) = resp
771            && let Ok(text) = ok_or_api_error(resp).await
772            && let Ok(session) = serde_json::from_str::<Session>(&text)
773        {
774            match session.status.to_string().as_str() {
775                "creating" => continue,
776                "lost" | "stopped" => {
777                    anyhow::bail!(
778                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
779                        session.name,
780                        session.name
781                    );
782                }
783                _ => return Ok(()),
784            }
785        }
786        // fetch failed — don't block, proceed to attach
787        break;
788    }
789    Ok(())
790}
791
792/// Compute the new trailing lines that differ from the previous output.
793///
794/// The output endpoint returns the last N lines from the terminal pane. As new lines
795/// appear, old lines at the top scroll off. We find the overlap between the end
796/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
797/// trailing lines.
798fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
799    if prev.is_empty() {
800        return new;
801    }
802
803    let prev_lines: Vec<&str> = prev.lines().collect();
804    let new_lines: Vec<&str> = new.lines().collect();
805
806    if new_lines.is_empty() {
807        return "";
808    }
809
810    // prev is non-empty (early return above), so last() always succeeds
811    let last_prev = prev_lines[prev_lines.len() - 1];
812
813    // Find the last line of prev in new to determine the overlap boundary
814    for i in (0..new_lines.len()).rev() {
815        if new_lines[i] == last_prev {
816            // Verify contiguous overlap: check that lines before this match too
817            let overlap_len = prev_lines.len().min(i + 1);
818            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
819            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
820            if prev_tail == new_overlap {
821                if i + 1 < new_lines.len() {
822                    // Return the slice of `new` after the overlap
823                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
824                    return new.get(consumed.min(new.len())..).unwrap_or("");
825                }
826                return "";
827            }
828        }
829    }
830
831    // No overlap found — output changed completely, print it all
832    new
833}
834
835/// Follow logs by polling, printing only new output. Returns when the session ends.
836async fn follow_logs(
837    client: &reqwest::Client,
838    base: &str,
839    name: &str,
840    lines: usize,
841    token: Option<&str>,
842    writer: &mut (dyn std::io::Write + Send),
843) -> Result<()> {
844    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
845    write!(writer, "{prev_output}")?;
846
847    let mut unchanged_ticks: u32 = 0;
848
849    loop {
850        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
851
852        // Fetch latest output
853        let new_output = fetch_output(client, base, name, lines, token).await?;
854
855        let diff = diff_output(&prev_output, &new_output);
856        if diff.is_empty() {
857            unchanged_ticks += 1;
858        } else {
859            write!(writer, "{diff}")?;
860            unchanged_ticks = 0;
861        }
862
863        // Check for agent exit marker in output
864        if new_output.contains(AGENT_EXIT_MARKER) {
865            break;
866        }
867
868        prev_output = new_output;
869
870        // Only check session status when output has been unchanged for 3+ ticks
871        if unchanged_ticks >= 3 {
872            let status = fetch_session_status(client, base, name, token).await?;
873            let is_terminal = status == "ready" || status == "stopped" || status == "lost";
874            if is_terminal {
875                break;
876            }
877        }
878    }
879    Ok(())
880}
881
882// --- Schedule API ---
883
884/// Execute a schedule subcommand via the scheduler API.
885#[cfg(not(coverage))]
886async fn execute_schedule(
887    client: &reqwest::Client,
888    action: &ScheduleAction,
889    base: &str,
890    token: Option<&str>,
891) -> Result<String> {
892    match action {
893        ScheduleAction::Add {
894            name,
895            cron,
896            workdir,
897            node,
898            ink,
899            description,
900            command,
901        } => {
902            let cmd = if command.is_empty() {
903                None
904            } else {
905                Some(command.join(" "))
906            };
907            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
908                std::env::current_dir()
909                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
910            });
911            let mut body = serde_json::json!({
912                "name": name,
913                "cron": cron,
914                "workdir": resolved_workdir,
915            });
916            if let Some(c) = &cmd {
917                body["command"] = serde_json::json!(c);
918            }
919            if let Some(n) = node {
920                body["target_node"] = serde_json::json!(n);
921            }
922            if let Some(i) = ink {
923                body["ink"] = serde_json::json!(i);
924            }
925            if let Some(d) = description {
926                body["description"] = serde_json::json!(d);
927            }
928            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
929                .json(&body)
930                .send()
931                .await?;
932            ok_or_api_error(resp).await?;
933            Ok(format!("Created schedule \"{name}\""))
934        }
935        ScheduleAction::List => {
936            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
937                .send()
938                .await?;
939            let text = ok_or_api_error(resp).await?;
940            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
941            Ok(format_schedules(&schedules))
942        }
943        ScheduleAction::Remove { name } => {
944            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
945                .send()
946                .await?;
947            ok_or_api_error(resp).await?;
948            Ok(format!("Removed schedule \"{name}\""))
949        }
950        ScheduleAction::Pause { name } => {
951            let body = serde_json::json!({ "enabled": false });
952            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
953                .json(&body)
954                .send()
955                .await?;
956            ok_or_api_error(resp).await?;
957            Ok(format!("Paused schedule \"{name}\""))
958        }
959        ScheduleAction::Resume { name } => {
960            let body = serde_json::json!({ "enabled": true });
961            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
962                .json(&body)
963                .send()
964                .await?;
965            ok_or_api_error(resp).await?;
966            Ok(format!("Resumed schedule \"{name}\""))
967        }
968    }
969}
970
971/// Coverage stub for schedule execution.
972#[cfg(coverage)]
973#[allow(clippy::unnecessary_wraps)]
974async fn execute_schedule(
975    _client: &reqwest::Client,
976    _action: &ScheduleAction,
977    _base: &str,
978    _token: Option<&str>,
979) -> Result<String> {
980    Ok(String::new())
981}
982
983// --- Secret API ---
984
985/// Format secret entries as a table.
986#[cfg_attr(coverage, allow(dead_code))]
987fn format_secrets(secrets: &[serde_json::Value]) -> String {
988    if secrets.is_empty() {
989        return "No secrets configured.".into();
990    }
991    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
992    for s in secrets {
993        let name = s["name"].as_str().unwrap_or("?");
994        let env_display = s["env"]
995            .as_str()
996            .map_or_else(|| name.to_owned(), String::from);
997        let created = s["created_at"]
998            .as_str()
999            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1000        lines.push(format!("{name:<24} {env_display:<24} {created}"));
1001    }
1002    lines.join("\n")
1003}
1004
1005/// Execute a secret subcommand via the secrets API.
1006#[cfg(not(coverage))]
1007async fn execute_secret(
1008    client: &reqwest::Client,
1009    action: &SecretAction,
1010    base: &str,
1011    token: Option<&str>,
1012) -> Result<String> {
1013    match action {
1014        SecretAction::Set { name, value, env } => {
1015            let mut body = serde_json::json!({ "value": value });
1016            if let Some(e) = env {
1017                body["env"] = serde_json::json!(e);
1018            }
1019            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1020                .json(&body)
1021                .send()
1022                .await?;
1023            ok_or_api_error(resp).await?;
1024            Ok(format!("Secret \"{name}\" set."))
1025        }
1026        SecretAction::List => {
1027            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1028                .send()
1029                .await?;
1030            let text = ok_or_api_error(resp).await?;
1031            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1032            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1033            Ok(format_secrets(secrets))
1034        }
1035        SecretAction::Delete { name } => {
1036            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1037                .send()
1038                .await?;
1039            ok_or_api_error(resp).await?;
1040            Ok(format!("Secret \"{name}\" deleted."))
1041        }
1042    }
1043}
1044
1045/// Coverage stub for secret execution.
1046#[cfg(coverage)]
1047#[allow(clippy::unnecessary_wraps)]
1048async fn execute_secret(
1049    _client: &reqwest::Client,
1050    _action: &SecretAction,
1051    _base: &str,
1052    _token: Option<&str>,
1053) -> Result<String> {
1054    Ok(String::new())
1055}
1056
1057/// Execute a worktree subcommand.
1058#[cfg(not(coverage))]
1059async fn execute_worktree(
1060    client: &reqwest::Client,
1061    action: &WorktreeAction,
1062    base: &str,
1063    token: Option<&str>,
1064) -> Result<String> {
1065    match action {
1066        WorktreeAction::List => {
1067            let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1068                .send()
1069                .await?;
1070            let text = ok_or_api_error(resp).await?;
1071            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1072            let wt_sessions: Vec<&Session> = sessions
1073                .iter()
1074                .filter(|s| s.worktree_path.is_some())
1075                .collect();
1076            Ok(format_worktree_sessions(&wt_sessions))
1077        }
1078    }
1079}
1080
1081/// Coverage stub for worktree execution.
1082#[cfg(coverage)]
1083#[allow(clippy::unnecessary_wraps)]
1084async fn execute_worktree(
1085    _client: &reqwest::Client,
1086    _action: &WorktreeAction,
1087    _base: &str,
1088    _token: Option<&str>,
1089) -> Result<String> {
1090    Ok(String::new())
1091}
1092
1093/// Format worktree sessions as a table.
1094fn format_worktree_sessions(sessions: &[&Session]) -> String {
1095    if sessions.is_empty() {
1096        return "No worktree sessions.".into();
1097    }
1098    let mut lines = vec![format!(
1099        "{:<20} {:<20} {:<10} {}",
1100        "NAME", "BRANCH", "STATUS", "PATH"
1101    )];
1102    for s in sessions {
1103        let branch = s.worktree_branch.as_deref().unwrap_or("-");
1104        let path = s.worktree_path.as_deref().unwrap_or("-");
1105        lines.push(format!(
1106            "{:<20} {:<20} {:<10} {}",
1107            s.name, branch, s.status, path
1108        ));
1109    }
1110    lines.join("\n")
1111}
1112
1113/// Format a list of schedules as a table.
1114#[cfg_attr(coverage, allow(dead_code))]
1115fn format_schedules(schedules: &[serde_json::Value]) -> String {
1116    if schedules.is_empty() {
1117        return "No schedules.".into();
1118    }
1119    let mut lines = vec![format!(
1120        "{:<20} {:<18} {:<8} {:<12} {}",
1121        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1122    )];
1123    for s in schedules {
1124        let name = s["name"].as_str().unwrap_or("?");
1125        let cron = s["cron"].as_str().unwrap_or("?");
1126        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1127            "yes"
1128        } else {
1129            "no"
1130        };
1131        let last_run = s["last_run_at"]
1132            .as_str()
1133            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1134        let node = s["target_node"].as_str().unwrap_or("local");
1135        lines.push(format!(
1136            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1137        ));
1138    }
1139    lines.join("\n")
1140}
1141
1142/// Select the best node from the peer registry based on load.
1143/// Returns the node address and name of the least loaded online peer.
1144/// Scoring: lower memory usage + fewer active sessions = better.
1145#[cfg(not(coverage))]
1146async fn select_best_node(
1147    client: &reqwest::Client,
1148    base: &str,
1149    token: Option<&str>,
1150) -> Result<(String, String)> {
1151    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1152        .send()
1153        .await?;
1154    let text = ok_or_api_error(resp).await?;
1155    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1156
1157    // Score: fewer active sessions is better, more memory is better
1158    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1159
1160    for peer in &peers_resp.peers {
1161        if peer.status != pulpo_common::peer::PeerStatus::Online {
1162            continue;
1163        }
1164        let sessions = peer.session_count.unwrap_or(0);
1165        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1166        // Lower score = better (fewer sessions, more memory)
1167        #[allow(clippy::cast_precision_loss)]
1168        let score = sessions as f64 - (mem as f64 / 1024.0);
1169        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1170            best = Some((peer.address.clone(), peer.name.clone(), score));
1171        }
1172    }
1173
1174    // Fall back to local if no online peers
1175    match best {
1176        Some((addr, name, _)) => Ok((addr, name)),
1177        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1178    }
1179}
1180
1181/// Coverage stub — auto-select always falls back to local.
1182#[cfg(coverage)]
1183#[allow(clippy::unnecessary_wraps)]
1184async fn select_best_node(
1185    _client: &reqwest::Client,
1186    _base: &str,
1187    _token: Option<&str>,
1188) -> Result<(String, String)> {
1189    Ok(("localhost:7433".into(), "local".into()))
1190}
1191
1192/// Execute the given CLI command against the specified node.
1193#[allow(clippy::too_many_lines)]
1194pub async fn execute(cli: &Cli) -> Result<String> {
1195    let client = reqwest::Client::new();
1196    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1197    let url = base_url(&resolved_node);
1198    let node = &resolved_node;
1199    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1200        .await
1201        .or(peer_token);
1202
1203    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1204    if cli.command.is_none() && cli.path.is_none() {
1205        // No subcommand and no path: print help
1206        use clap::CommandFactory;
1207        let mut cmd = Cli::command();
1208        cmd.print_help()?;
1209        println!();
1210        return Ok(String::new());
1211    }
1212    if cli.command.is_none() {
1213        let path = cli.path.as_deref().unwrap_or(".");
1214        let resolved_workdir = resolve_path(path);
1215        let base_name = derive_session_name(&resolved_workdir);
1216        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1217        let body = serde_json::json!({
1218            "name": name,
1219            "workdir": resolved_workdir,
1220        });
1221        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1222            .json(&body)
1223            .send()
1224            .await
1225            .map_err(|e| friendly_error(&e, node))?;
1226        let text = ok_or_api_error(resp).await?;
1227        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1228        let msg = format!(
1229            "Created session \"{}\" ({})",
1230            resp.session.name, resp.session.id
1231        );
1232        let backend_id = resp
1233            .session
1234            .backend_session_id
1235            .as_deref()
1236            .unwrap_or(&resp.session.name);
1237        eprintln!("{msg}");
1238        // Path shortcut spawns a shell (no command) — skip liveness check
1239        // since shell sessions are immediately detected as idle by the watchdog
1240        attach_session(backend_id)?;
1241        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1242    }
1243
1244    match cli.command.as_ref().unwrap() {
1245        Commands::Attach { name } => {
1246            // Fetch session to get status and backend_session_id
1247            let resp = authed_get(
1248                &client,
1249                format!("{url}/api/v1/sessions/{name}"),
1250                token.as_deref(),
1251            )
1252            .send()
1253            .await
1254            .map_err(|e| friendly_error(&e, node))?;
1255            let text = ok_or_api_error(resp).await?;
1256            let session: Session = serde_json::from_str(&text)?;
1257            match session.status.to_string().as_str() {
1258                "lost" => {
1259                    anyhow::bail!(
1260                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1261                    );
1262                }
1263                "stopped" => {
1264                    anyhow::bail!(
1265                        "Session \"{name}\" is {} — cannot attach to a stopped session.",
1266                        session.status
1267                    );
1268                }
1269                _ => {}
1270            }
1271            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1272            attach_session(&backend_id)?;
1273            Ok(format!("Detached from session {name}."))
1274        }
1275        Commands::Input { name, text } => {
1276            let input_text = text.as_deref().unwrap_or("\n");
1277            let body = serde_json::json!({ "text": input_text });
1278            let resp = authed_post(
1279                &client,
1280                format!("{url}/api/v1/sessions/{name}/input"),
1281                token.as_deref(),
1282            )
1283            .json(&body)
1284            .send()
1285            .await
1286            .map_err(|e| friendly_error(&e, node))?;
1287            ok_or_api_error(resp).await?;
1288            Ok(format!("Sent input to session {name}."))
1289        }
1290        Commands::List { all } => {
1291            let list_url = if *all {
1292                format!("{url}/api/v1/sessions")
1293            } else {
1294                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1295            };
1296            let resp = authed_get(&client, list_url, token.as_deref())
1297                .send()
1298                .await
1299                .map_err(|e| friendly_error(&e, node))?;
1300            let text = ok_or_api_error(resp).await?;
1301            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1302            Ok(format_sessions(&sessions))
1303        }
1304        Commands::Nodes => {
1305            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1306                .send()
1307                .await
1308                .map_err(|e| friendly_error(&e, node))?;
1309            let text = ok_or_api_error(resp).await?;
1310            let resp: PeersResponse = serde_json::from_str(&text)?;
1311            Ok(format_nodes(&resp))
1312        }
1313        Commands::Spawn {
1314            workdir,
1315            name,
1316            ink,
1317            description,
1318            detach,
1319            idle_threshold,
1320            auto,
1321            worktree,
1322            worktree_base,
1323            runtime,
1324            secret,
1325            command,
1326        } => {
1327            let cmd = if command.is_empty() {
1328                None
1329            } else {
1330                Some(command.join(" "))
1331            };
1332            // Resolve workdir: --workdir flag > current directory
1333            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1334                std::env::current_dir()
1335                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1336            });
1337            // Resolve name: explicit > derived from workdir (with dedup)
1338            let resolved_name = if let Some(n) = name {
1339                n.clone()
1340            } else {
1341                let base_name = derive_session_name(&resolved_workdir);
1342                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1343            };
1344            let mut body = serde_json::json!({
1345                "name": resolved_name,
1346                "workdir": resolved_workdir,
1347            });
1348            if let Some(c) = &cmd {
1349                body["command"] = serde_json::json!(c);
1350            }
1351            if let Some(i) = ink {
1352                body["ink"] = serde_json::json!(i);
1353            }
1354            if let Some(d) = description {
1355                body["description"] = serde_json::json!(d);
1356            }
1357            if let Some(t) = idle_threshold {
1358                body["idle_threshold_secs"] = serde_json::json!(t);
1359            }
1360            // --base-branch implies --worktree
1361            if *worktree || worktree_base.is_some() {
1362                body["worktree"] = serde_json::json!(true);
1363                if let Some(base) = worktree_base {
1364                    body["worktree_base"] = serde_json::json!(base);
1365                    eprintln!(
1366                        "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1367                    );
1368                } else {
1369                    eprintln!(
1370                        "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1371                    );
1372                }
1373            }
1374            if let Some(rt) = runtime {
1375                body["runtime"] = serde_json::json!(rt);
1376            }
1377            if !secret.is_empty() {
1378                body["secrets"] = serde_json::json!(secret);
1379            }
1380            let spawn_url = if *auto {
1381                let (auto_addr, auto_name) =
1382                    select_best_node(&client, &url, token.as_deref()).await?;
1383                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1384                base_url(&auto_addr)
1385            } else {
1386                url.clone()
1387            };
1388            let resp = authed_post(
1389                &client,
1390                format!("{spawn_url}/api/v1/sessions"),
1391                token.as_deref(),
1392            )
1393            .json(&body)
1394            .send()
1395            .await
1396            .map_err(|e| friendly_error(&e, node))?;
1397            let text = ok_or_api_error(resp).await?;
1398            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1399            let msg = format!(
1400                "Created session \"{}\" ({})",
1401                resp.session.name, resp.session.id
1402            );
1403            if !detach {
1404                let backend_id = resp
1405                    .session
1406                    .backend_session_id
1407                    .as_deref()
1408                    .unwrap_or(&resp.session.name);
1409                eprintln!("{msg}");
1410                // Only check liveness for explicit commands — shell sessions (no command)
1411                // may be immediately marked idle/stopped by the watchdog, which is expected
1412                if cmd.is_some() {
1413                    let sid = resp.session.id.to_string();
1414                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1415                }
1416                attach_session(backend_id)?;
1417                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1418            }
1419            Ok(msg)
1420        }
1421        Commands::Stop { names, purge } => {
1422            let mut results = Vec::new();
1423            for name in names {
1424                let query = if *purge { "?purge=true" } else { "" };
1425                let resp = authed_post(
1426                    &client,
1427                    format!("{url}/api/v1/sessions/{name}/stop{query}"),
1428                    token.as_deref(),
1429                )
1430                .send()
1431                .await
1432                .map_err(|e| friendly_error(&e, node))?;
1433                let action = if *purge {
1434                    "stopped and purged"
1435                } else {
1436                    "stopped"
1437                };
1438                match ok_or_api_error(resp).await {
1439                    Ok(_) => results.push(format!("Session {name} {action}.")),
1440                    Err(e) => results.push(format!("Error stopping {name}: {e}")),
1441                }
1442            }
1443            Ok(results.join("\n"))
1444        }
1445        Commands::Cleanup => {
1446            let resp = authed_post(
1447                &client,
1448                format!("{url}/api/v1/sessions/cleanup"),
1449                token.as_deref(),
1450            )
1451            .send()
1452            .await
1453            .map_err(|e| friendly_error(&e, node))?;
1454            let text = ok_or_api_error(resp).await?;
1455            let result: serde_json::Value = serde_json::from_str(&text)?;
1456            let count = result["deleted"].as_u64().unwrap_or(0);
1457            if count == 0 {
1458                Ok("No stopped or lost sessions to clean up.".into())
1459            } else {
1460                Ok(format!("Cleaned up {count} session(s)."))
1461            }
1462        }
1463        Commands::Logs {
1464            name,
1465            lines,
1466            follow,
1467        } => {
1468            if *follow {
1469                let mut stdout = std::io::stdout();
1470                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1471                    .await
1472                    .map_err(|e| {
1473                        // Unwrap reqwest errors to friendly messages
1474                        match e.downcast::<reqwest::Error>() {
1475                            Ok(re) => friendly_error(&re, node),
1476                            Err(other) => other,
1477                        }
1478                    })?;
1479                Ok(String::new())
1480            } else {
1481                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1482                    .await
1483                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1484                        Ok(re) => friendly_error(&re, node),
1485                        Err(other) => other,
1486                    })?;
1487                Ok(output)
1488            }
1489        }
1490        Commands::Interventions { name } => {
1491            let resp = authed_get(
1492                &client,
1493                format!("{url}/api/v1/sessions/{name}/interventions"),
1494                token.as_deref(),
1495            )
1496            .send()
1497            .await
1498            .map_err(|e| friendly_error(&e, node))?;
1499            let text = ok_or_api_error(resp).await?;
1500            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1501            Ok(format_interventions(&events))
1502        }
1503        Commands::Ui => {
1504            let dashboard = base_url(node);
1505            open_browser(&dashboard)?;
1506            Ok(format!("Opening {dashboard}"))
1507        }
1508        Commands::Resume { name } => {
1509            let resp = authed_post(
1510                &client,
1511                format!("{url}/api/v1/sessions/{name}/resume"),
1512                token.as_deref(),
1513            )
1514            .send()
1515            .await
1516            .map_err(|e| friendly_error(&e, node))?;
1517            let text = ok_or_api_error(resp).await?;
1518            let session: Session = serde_json::from_str(&text)?;
1519            let backend_id = session
1520                .backend_session_id
1521                .as_deref()
1522                .unwrap_or(&session.name);
1523            eprintln!("Resumed session \"{}\"", session.name);
1524            let sid = session.id.to_string();
1525            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1526            attach_session(backend_id)?;
1527            Ok(format!("Detached from session \"{}\".", session.name))
1528        }
1529        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1530            .await
1531            .map_err(|e| match e.downcast::<reqwest::Error>() {
1532                Ok(re) => friendly_error(&re, node),
1533                Err(other) => other,
1534            }),
1535        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1536            .await
1537            .map_err(|e| match e.downcast::<reqwest::Error>() {
1538                Ok(re) => friendly_error(&re, node),
1539                Err(other) => other,
1540            }),
1541        Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1542            .await
1543            .map_err(|e| match e.downcast::<reqwest::Error>() {
1544                Ok(re) => friendly_error(&re, node),
1545                Err(other) => other,
1546            }),
1547    }
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552    use super::*;
1553
1554    #[test]
1555    fn test_base_url() {
1556        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1557        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1558    }
1559
1560    #[test]
1561    fn test_cli_parse_list() {
1562        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1563        assert_eq!(cli.node, "localhost:7433");
1564        assert!(matches!(cli.command, Some(Commands::List { .. })));
1565    }
1566
1567    #[test]
1568    fn test_cli_parse_nodes() {
1569        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1570        assert!(matches!(cli.command, Some(Commands::Nodes)));
1571    }
1572
1573    #[test]
1574    fn test_cli_parse_ui() {
1575        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1576        assert!(matches!(cli.command, Some(Commands::Ui)));
1577    }
1578
1579    #[test]
1580    fn test_cli_parse_ui_custom_node() {
1581        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1582        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1583        assert_eq!(cli.node, "mac-mini:7433");
1584        assert_eq!(cli.path.as_deref(), Some("ui"));
1585    }
1586
1587    #[test]
1588    fn test_build_open_command() {
1589        let cmd = build_open_command("http://localhost:7433");
1590        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1591        assert_eq!(args, vec!["http://localhost:7433"]);
1592        #[cfg(target_os = "macos")]
1593        assert_eq!(cmd.get_program(), "open");
1594        #[cfg(target_os = "linux")]
1595        assert_eq!(cmd.get_program(), "xdg-open");
1596    }
1597
1598    #[test]
1599    fn test_cli_parse_spawn() {
1600        let cli = Cli::try_parse_from([
1601            "pulpo",
1602            "spawn",
1603            "my-task",
1604            "--workdir",
1605            "/tmp/repo",
1606            "--",
1607            "claude",
1608            "-p",
1609            "Fix the bug",
1610        ])
1611        .unwrap();
1612        assert!(matches!(
1613            &cli.command,
1614            Some(Commands::Spawn { name, workdir, command, .. })
1615                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1616                && command == &["claude", "-p", "Fix the bug"]
1617        ));
1618    }
1619
1620    #[test]
1621    fn test_cli_parse_spawn_with_ink() {
1622        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1623        assert!(matches!(
1624            &cli.command,
1625            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1626        ));
1627    }
1628
1629    #[test]
1630    fn test_cli_parse_spawn_with_description() {
1631        let cli =
1632            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1633                .unwrap();
1634        assert!(matches!(
1635            &cli.command,
1636            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1637        ));
1638    }
1639
1640    #[test]
1641    fn test_cli_parse_spawn_name_positional() {
1642        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1643        assert!(matches!(
1644            &cli.command,
1645            Some(Commands::Spawn { name, command, .. })
1646                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1647        ));
1648    }
1649
1650    #[test]
1651    fn test_cli_parse_spawn_no_command() {
1652        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1653        assert!(matches!(
1654            &cli.command,
1655            Some(Commands::Spawn { command, .. }) if command.is_empty()
1656        ));
1657    }
1658
1659    #[test]
1660    fn test_cli_parse_spawn_idle_threshold() {
1661        let cli =
1662            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1663        assert!(matches!(
1664            &cli.command,
1665            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1666        ));
1667    }
1668
1669    #[test]
1670    fn test_cli_parse_spawn_idle_threshold_60() {
1671        let cli =
1672            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1673        assert!(matches!(
1674            &cli.command,
1675            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1676        ));
1677    }
1678
1679    #[test]
1680    fn test_cli_parse_spawn_secrets() {
1681        let cli = Cli::try_parse_from([
1682            "pulpo",
1683            "spawn",
1684            "my-task",
1685            "--secret",
1686            "GITHUB_TOKEN",
1687            "--secret",
1688            "NPM_TOKEN",
1689        ])
1690        .unwrap();
1691        assert!(matches!(
1692            &cli.command,
1693            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1694        ));
1695    }
1696
1697    #[test]
1698    fn test_cli_parse_spawn_no_secrets() {
1699        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1700        assert!(matches!(
1701            &cli.command,
1702            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1703        ));
1704    }
1705
1706    #[test]
1707    fn test_cli_parse_secret_set_with_env() {
1708        let cli = Cli::try_parse_from([
1709            "pulpo",
1710            "secret",
1711            "set",
1712            "GH_WORK",
1713            "token123",
1714            "--env",
1715            "GITHUB_TOKEN",
1716        ])
1717        .unwrap();
1718        assert!(matches!(
1719            &cli.command,
1720            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1721                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1722        ));
1723    }
1724
1725    #[test]
1726    fn test_cli_parse_secret_set_without_env() {
1727        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1728        assert!(matches!(
1729            &cli.command,
1730            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1731                if name == "MY_KEY" && value == "val" && env.is_none()
1732        ));
1733    }
1734
1735    #[test]
1736    fn test_cli_parse_spawn_worktree() {
1737        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1738        assert!(matches!(
1739            &cli.command,
1740            Some(Commands::Spawn { worktree, .. }) if *worktree
1741        ));
1742    }
1743
1744    #[test]
1745    fn test_cli_parse_spawn_worktree_base() {
1746        let cli = Cli::try_parse_from([
1747            "pulpo",
1748            "spawn",
1749            "my-task",
1750            "--worktree",
1751            "--worktree-base",
1752            "main",
1753        ])
1754        .unwrap();
1755        assert!(matches!(
1756            &cli.command,
1757            Some(Commands::Spawn { worktree, worktree_base, .. })
1758                if *worktree && worktree_base.as_deref() == Some("main")
1759        ));
1760    }
1761
1762    #[test]
1763    fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1764        // --worktree-base without --worktree should still parse (implied at execute time)
1765        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1766            .unwrap();
1767        assert!(matches!(
1768            &cli.command,
1769            Some(Commands::Spawn { worktree_base, .. })
1770                if worktree_base.as_deref() == Some("develop")
1771        ));
1772    }
1773
1774    #[test]
1775    fn test_cli_parse_worktree_list() {
1776        let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1777        assert!(matches!(
1778            &cli.command,
1779            Some(Commands::Worktree {
1780                action: WorktreeAction::List
1781            })
1782        ));
1783    }
1784
1785    #[test]
1786    fn test_cli_parse_wt_alias() {
1787        let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1788        assert!(matches!(
1789            &cli.command,
1790            Some(Commands::Worktree {
1791                action: WorktreeAction::List
1792            })
1793        ));
1794    }
1795
1796    #[test]
1797    fn test_cli_parse_worktree_list_ls_alias() {
1798        let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1799        assert!(matches!(
1800            &cli.command,
1801            Some(Commands::Worktree {
1802                action: WorktreeAction::List
1803            })
1804        ));
1805    }
1806
1807    #[test]
1808    fn test_format_worktree_sessions_empty() {
1809        let output = format_worktree_sessions(&[]);
1810        assert_eq!(output, "No worktree sessions.");
1811    }
1812
1813    #[test]
1814    fn test_format_worktree_sessions_with_data() {
1815        use chrono::Utc;
1816        use pulpo_common::session::SessionStatus;
1817        use uuid::Uuid;
1818
1819        let session = Session {
1820            id: Uuid::nil(),
1821            name: "fix-auth".into(),
1822            workdir: "/tmp/repo".into(),
1823            command: "claude -p 'fix auth'".into(),
1824            description: None,
1825            status: SessionStatus::Active,
1826            exit_code: None,
1827            backend_session_id: None,
1828            output_snapshot: None,
1829            metadata: None,
1830            ink: None,
1831            intervention_code: None,
1832            intervention_reason: None,
1833            intervention_at: None,
1834            last_output_at: None,
1835            idle_since: None,
1836            idle_threshold_secs: None,
1837            worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
1838            worktree_branch: Some("fix-auth".into()),
1839            runtime: Runtime::Tmux,
1840            created_at: Utc::now(),
1841            updated_at: Utc::now(),
1842        };
1843        let sessions = vec![&session];
1844        let output = format_worktree_sessions(&sessions);
1845        assert!(output.contains("fix-auth"), "should show name: {output}");
1846        assert!(output.contains("active"), "should show status: {output}");
1847        assert!(
1848            output.contains("/home/user/.pulpo/worktrees/fix-auth"),
1849            "should show path: {output}"
1850        );
1851        assert!(output.contains("BRANCH"), "should have header: {output}");
1852    }
1853
1854    #[test]
1855    fn test_format_worktree_sessions_no_branch() {
1856        use chrono::Utc;
1857        use pulpo_common::session::SessionStatus;
1858        use uuid::Uuid;
1859
1860        let session = Session {
1861            id: Uuid::nil(),
1862            name: "old-session".into(),
1863            workdir: "/tmp".into(),
1864            command: "echo".into(),
1865            description: None,
1866            status: SessionStatus::Active,
1867            exit_code: None,
1868            backend_session_id: None,
1869            output_snapshot: None,
1870            metadata: None,
1871            ink: None,
1872            intervention_code: None,
1873            intervention_reason: None,
1874            intervention_at: None,
1875            last_output_at: None,
1876            idle_since: None,
1877            idle_threshold_secs: None,
1878            worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
1879            worktree_branch: None,
1880            runtime: Runtime::Tmux,
1881            created_at: Utc::now(),
1882            updated_at: Utc::now(),
1883        };
1884        let sessions = vec![&session];
1885        let output = format_worktree_sessions(&sessions);
1886        assert!(
1887            output.contains('-'),
1888            "branch should show dash when None: {output}"
1889        );
1890    }
1891
1892    #[test]
1893    fn test_cli_parse_spawn_detach() {
1894        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1895        assert!(matches!(
1896            &cli.command,
1897            Some(Commands::Spawn { detach, .. }) if *detach
1898        ));
1899    }
1900
1901    #[test]
1902    fn test_cli_parse_spawn_detach_short() {
1903        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1904        assert!(matches!(
1905            &cli.command,
1906            Some(Commands::Spawn { detach, .. }) if *detach
1907        ));
1908    }
1909
1910    #[test]
1911    fn test_cli_parse_spawn_detach_default() {
1912        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1913        assert!(matches!(
1914            &cli.command,
1915            Some(Commands::Spawn { detach, .. }) if !detach
1916        ));
1917    }
1918
1919    #[test]
1920    fn test_cli_parse_logs() {
1921        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1922        assert!(matches!(
1923            &cli.command,
1924            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1925        ));
1926    }
1927
1928    #[test]
1929    fn test_cli_parse_logs_with_lines() {
1930        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1931        assert!(matches!(
1932            &cli.command,
1933            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1934        ));
1935    }
1936
1937    #[test]
1938    fn test_cli_parse_logs_follow() {
1939        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1940        assert!(matches!(
1941            &cli.command,
1942            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1943        ));
1944    }
1945
1946    #[test]
1947    fn test_cli_parse_logs_follow_short() {
1948        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1949        assert!(matches!(
1950            &cli.command,
1951            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1952        ));
1953    }
1954
1955    #[test]
1956    fn test_cli_parse_stop() {
1957        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
1958        assert!(matches!(
1959            &cli.command,
1960            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1961        ));
1962    }
1963
1964    #[test]
1965    fn test_cli_parse_stop_purge() {
1966        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
1967        assert!(matches!(
1968            &cli.command,
1969            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1970        ));
1971
1972        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
1973        assert!(matches!(
1974            &cli.command,
1975            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1976        ));
1977    }
1978
1979    #[test]
1980    fn test_cli_parse_kill_alias() {
1981        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1982        assert!(matches!(
1983            &cli.command,
1984            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1985        ));
1986    }
1987
1988    #[test]
1989    fn test_cli_parse_resume() {
1990        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1991        assert!(matches!(
1992            &cli.command,
1993            Some(Commands::Resume { name }) if name == "my-session"
1994        ));
1995    }
1996
1997    #[test]
1998    fn test_cli_parse_input() {
1999        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2000        assert!(matches!(
2001            &cli.command,
2002            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2003        ));
2004    }
2005
2006    #[test]
2007    fn test_cli_parse_input_no_text() {
2008        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2009        assert!(matches!(
2010            &cli.command,
2011            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2012        ));
2013    }
2014
2015    #[test]
2016    fn test_cli_parse_input_alias() {
2017        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2018        assert!(matches!(
2019            &cli.command,
2020            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2021        ));
2022    }
2023
2024    #[test]
2025    fn test_cli_parse_custom_node() {
2026        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2027        assert_eq!(cli.node, "win-pc:8080");
2028        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
2029        assert_eq!(cli.path.as_deref(), Some("list"));
2030    }
2031
2032    #[test]
2033    fn test_cli_version() {
2034        let result = Cli::try_parse_from(["pulpo", "--version"]);
2035        // clap exits with an error (kind DisplayVersion) when --version is used
2036        let err = result.unwrap_err();
2037        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2038    }
2039
2040    #[test]
2041    fn test_cli_parse_no_subcommand_succeeds() {
2042        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2043        assert!(cli.command.is_none());
2044        assert!(cli.path.is_none());
2045    }
2046
2047    #[test]
2048    fn test_cli_debug() {
2049        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2050        let debug = format!("{cli:?}");
2051        assert!(debug.contains("List"));
2052    }
2053
2054    #[test]
2055    fn test_commands_debug() {
2056        let cmd = Commands::List { all: false };
2057        assert_eq!(format!("{cmd:?}"), "List { all: false }");
2058    }
2059
2060    /// A valid Session JSON for test responses.
2061    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2062
2063    /// A valid `CreateSessionResponse` JSON wrapping the session.
2064    fn test_create_response_json() -> String {
2065        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2066    }
2067
2068    /// Start a lightweight test HTTP server and return its address.
2069    async fn start_test_server() -> String {
2070        use axum::http::StatusCode;
2071        use axum::{
2072            Json, Router,
2073            routing::{get, post},
2074        };
2075
2076        let create_json = test_create_response_json();
2077
2078        let app = Router::new()
2079            .route(
2080                "/api/v1/sessions",
2081                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2082                    (StatusCode::CREATED, create_json.clone())
2083                }),
2084            )
2085            .route(
2086                "/api/v1/sessions/{id}",
2087                get(|| async { TEST_SESSION_JSON.to_owned() }),
2088            )
2089            .route(
2090                "/api/v1/sessions/{id}/stop",
2091                post(|| async { StatusCode::NO_CONTENT }),
2092            )
2093            .route(
2094                "/api/v1/sessions/{id}/output",
2095                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2096            )
2097            .route(
2098                "/api/v1/peers",
2099                get(|| async {
2100                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2101                }),
2102            )
2103            .route(
2104                "/api/v1/sessions/{id}/resume",
2105                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2106            )
2107            .route(
2108                "/api/v1/sessions/{id}/interventions",
2109                get(|| async { "[]".to_owned() }),
2110            )
2111            .route(
2112                "/api/v1/sessions/{id}/input",
2113                post(|| async { StatusCode::NO_CONTENT }),
2114            )
2115            .route(
2116                "/api/v1/schedules",
2117                get(|| async { Json::<Vec<()>>(vec![]) })
2118                    .post(|| async { StatusCode::CREATED }),
2119            )
2120            .route(
2121                "/api/v1/schedules/{id}",
2122                axum::routing::put(|| async { StatusCode::OK })
2123                    .delete(|| async { StatusCode::NO_CONTENT }),
2124            );
2125
2126        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2127        let addr = listener.local_addr().unwrap();
2128        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2129        format!("127.0.0.1:{}", addr.port())
2130    }
2131
2132    #[tokio::test]
2133    async fn test_execute_list_success() {
2134        let node = start_test_server().await;
2135        let cli = Cli {
2136            node,
2137            token: None,
2138            command: Some(Commands::List { all: false }),
2139            path: None,
2140        };
2141        let result = execute(&cli).await.unwrap();
2142        assert_eq!(result, "No sessions.");
2143    }
2144
2145    #[tokio::test]
2146    async fn test_execute_nodes_success() {
2147        let node = start_test_server().await;
2148        let cli = Cli {
2149            node,
2150            token: None,
2151            command: Some(Commands::Nodes),
2152            path: None,
2153        };
2154        let result = execute(&cli).await.unwrap();
2155        assert!(result.contains("test"));
2156        assert!(result.contains("(local)"));
2157        assert!(result.contains("NAME"));
2158    }
2159
2160    #[tokio::test]
2161    async fn test_execute_spawn_success() {
2162        let node = start_test_server().await;
2163        let cli = Cli {
2164            node,
2165            token: None,
2166            command: Some(Commands::Spawn {
2167                name: Some("test".into()),
2168                workdir: Some("/tmp/repo".into()),
2169                ink: None,
2170                description: None,
2171                detach: true,
2172                idle_threshold: None,
2173                auto: false,
2174                worktree: false,
2175                worktree_base: None,
2176                runtime: None,
2177                secret: vec![],
2178                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2179            }),
2180            path: None,
2181        };
2182        let result = execute(&cli).await.unwrap();
2183        assert!(result.contains("Created session"));
2184        assert!(result.contains("repo"));
2185    }
2186
2187    #[tokio::test]
2188    async fn test_execute_spawn_with_all_flags() {
2189        let node = start_test_server().await;
2190        let cli = Cli {
2191            node,
2192            token: None,
2193            command: Some(Commands::Spawn {
2194                name: Some("test".into()),
2195                workdir: Some("/tmp/repo".into()),
2196                ink: Some("coder".into()),
2197                description: Some("Fix the bug".into()),
2198                detach: true,
2199                idle_threshold: None,
2200                auto: false,
2201                worktree: false,
2202                worktree_base: None,
2203                runtime: None,
2204                secret: vec![],
2205                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2206            }),
2207            path: None,
2208        };
2209        let result = execute(&cli).await.unwrap();
2210        assert!(result.contains("Created session"));
2211    }
2212
2213    #[tokio::test]
2214    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2215        let node = start_test_server().await;
2216        let cli = Cli {
2217            node,
2218            token: None,
2219            command: Some(Commands::Spawn {
2220                name: Some("full-opts".into()),
2221                workdir: Some("/tmp/repo".into()),
2222                ink: Some("coder".into()),
2223                description: Some("Full options".into()),
2224                detach: true,
2225                idle_threshold: Some(120),
2226                auto: false,
2227                worktree: true,
2228                worktree_base: None,
2229                runtime: Some("docker".into()),
2230                secret: vec![],
2231                command: vec!["claude".into()],
2232            }),
2233            path: None,
2234        };
2235        let result = execute(&cli).await.unwrap();
2236        assert!(result.contains("Created session"));
2237    }
2238
2239    #[tokio::test]
2240    async fn test_execute_spawn_no_name_derives_from_workdir() {
2241        let node = start_test_server().await;
2242        let cli = Cli {
2243            node,
2244            token: None,
2245            command: Some(Commands::Spawn {
2246                name: None,
2247                workdir: Some("/tmp/my-project".into()),
2248                ink: None,
2249                description: None,
2250                detach: true,
2251                idle_threshold: None,
2252                auto: false,
2253                worktree: false,
2254                worktree_base: None,
2255                runtime: None,
2256                secret: vec![],
2257                command: vec!["echo".into(), "hello".into()],
2258            }),
2259            path: None,
2260        };
2261        let result = execute(&cli).await.unwrap();
2262        assert!(result.contains("Created session"));
2263    }
2264
2265    #[tokio::test]
2266    async fn test_execute_spawn_no_command() {
2267        let node = start_test_server().await;
2268        let cli = Cli {
2269            node,
2270            token: None,
2271            command: Some(Commands::Spawn {
2272                name: Some("test".into()),
2273                workdir: Some("/tmp/repo".into()),
2274                ink: None,
2275                description: None,
2276                detach: true,
2277                idle_threshold: None,
2278                auto: false,
2279                worktree: false,
2280                worktree_base: None,
2281                runtime: None,
2282                secret: vec![],
2283                command: vec![],
2284            }),
2285            path: None,
2286        };
2287        let result = execute(&cli).await.unwrap();
2288        assert!(result.contains("Created session"));
2289    }
2290
2291    #[tokio::test]
2292    async fn test_execute_spawn_with_name() {
2293        let node = start_test_server().await;
2294        let cli = Cli {
2295            node,
2296            token: None,
2297            command: Some(Commands::Spawn {
2298                name: Some("my-task".into()),
2299                workdir: Some("/tmp/repo".into()),
2300                ink: None,
2301                description: None,
2302                detach: true,
2303                idle_threshold: None,
2304                auto: false,
2305                worktree: false,
2306                worktree_base: None,
2307                runtime: None,
2308                secret: vec![],
2309                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2310            }),
2311            path: None,
2312        };
2313        let result = execute(&cli).await.unwrap();
2314        assert!(result.contains("Created session"));
2315    }
2316
2317    #[tokio::test]
2318    async fn test_execute_spawn_auto_attach() {
2319        let node = start_test_server().await;
2320        let cli = Cli {
2321            node,
2322            token: None,
2323            command: Some(Commands::Spawn {
2324                name: Some("test".into()),
2325                workdir: Some("/tmp/repo".into()),
2326                ink: None,
2327                description: None,
2328                detach: false,
2329                idle_threshold: None,
2330                auto: false,
2331                worktree: false,
2332                worktree_base: None,
2333                runtime: None,
2334                secret: vec![],
2335                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2336            }),
2337            path: None,
2338        };
2339        let result = execute(&cli).await.unwrap();
2340        // When not detached, spawn prints creation to stderr and returns detach message
2341        assert!(result.contains("Detached from session"));
2342    }
2343
2344    #[tokio::test]
2345    async fn test_execute_stop_success() {
2346        let node = start_test_server().await;
2347        let cli = Cli {
2348            node,
2349            token: None,
2350            command: Some(Commands::Stop {
2351                names: vec!["test-session".into()],
2352                purge: false,
2353            }),
2354            path: None,
2355        };
2356        let result = execute(&cli).await.unwrap();
2357        assert!(result.contains("stopped"));
2358        assert!(!result.contains("purged"));
2359    }
2360
2361    #[tokio::test]
2362    async fn test_execute_stop_with_purge() {
2363        let node = start_test_server().await;
2364        let cli = Cli {
2365            node,
2366            token: None,
2367            command: Some(Commands::Stop {
2368                names: vec!["test-session".into()],
2369                purge: true,
2370            }),
2371            path: None,
2372        };
2373        let result = execute(&cli).await.unwrap();
2374        assert!(result.contains("stopped and purged"));
2375    }
2376
2377    #[tokio::test]
2378    async fn test_execute_logs_success() {
2379        let node = start_test_server().await;
2380        let cli = Cli {
2381            node,
2382            token: None,
2383            command: Some(Commands::Logs {
2384                name: "test-session".into(),
2385                lines: 50,
2386                follow: false,
2387            }),
2388            path: None,
2389        };
2390        let result = execute(&cli).await.unwrap();
2391        assert!(result.contains("test output"));
2392    }
2393
2394    #[tokio::test]
2395    async fn test_execute_list_connection_refused() {
2396        let cli = Cli {
2397            node: "localhost:1".into(),
2398            token: None,
2399            command: Some(Commands::List { all: false }),
2400            path: None,
2401        };
2402        let result = execute(&cli).await;
2403        let err = result.unwrap_err().to_string();
2404        assert!(
2405            err.contains("Could not connect to pulpod"),
2406            "Expected friendly error, got: {err}"
2407        );
2408        assert!(err.contains("localhost:1"));
2409    }
2410
2411    #[tokio::test]
2412    async fn test_execute_nodes_connection_refused() {
2413        let cli = Cli {
2414            node: "localhost:1".into(),
2415            token: None,
2416            command: Some(Commands::Nodes),
2417            path: None,
2418        };
2419        let result = execute(&cli).await;
2420        let err = result.unwrap_err().to_string();
2421        assert!(err.contains("Could not connect to pulpod"));
2422    }
2423
2424    #[tokio::test]
2425    async fn test_execute_stop_error_response() {
2426        use axum::{Router, http::StatusCode, routing::post};
2427
2428        let app = Router::new().route(
2429            "/api/v1/sessions/{id}/stop",
2430            post(|| async {
2431                (
2432                    StatusCode::NOT_FOUND,
2433                    "{\"error\":\"session not found: test-session\"}",
2434                )
2435            }),
2436        );
2437        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2438        let addr = listener.local_addr().unwrap();
2439        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2440        let node = format!("127.0.0.1:{}", addr.port());
2441
2442        let cli = Cli {
2443            node,
2444            token: None,
2445            command: Some(Commands::Stop {
2446                names: vec!["test-session".into()],
2447                purge: false,
2448            }),
2449            path: None,
2450        };
2451        let result = execute(&cli).await.unwrap();
2452        assert!(result.contains("Error stopping test-session"), "{result}");
2453    }
2454
2455    #[tokio::test]
2456    async fn test_execute_logs_error_response() {
2457        use axum::{Router, http::StatusCode, routing::get};
2458
2459        let app = Router::new().route(
2460            "/api/v1/sessions/{id}/output",
2461            get(|| async {
2462                (
2463                    StatusCode::NOT_FOUND,
2464                    "{\"error\":\"session not found: ghost\"}",
2465                )
2466            }),
2467        );
2468        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2469        let addr = listener.local_addr().unwrap();
2470        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2471        let node = format!("127.0.0.1:{}", addr.port());
2472
2473        let cli = Cli {
2474            node,
2475            token: None,
2476            command: Some(Commands::Logs {
2477                name: "ghost".into(),
2478                lines: 50,
2479                follow: false,
2480            }),
2481            path: None,
2482        };
2483        let err = execute(&cli).await.unwrap_err();
2484        assert_eq!(err.to_string(), "session not found: ghost");
2485    }
2486
2487    #[tokio::test]
2488    async fn test_execute_resume_error_response() {
2489        use axum::{Router, http::StatusCode, routing::post};
2490
2491        let app = Router::new().route(
2492            "/api/v1/sessions/{id}/resume",
2493            post(|| async {
2494                (
2495                    StatusCode::BAD_REQUEST,
2496                    "{\"error\":\"session is not lost (status: active)\"}",
2497                )
2498            }),
2499        );
2500        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2501        let addr = listener.local_addr().unwrap();
2502        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2503        let node = format!("127.0.0.1:{}", addr.port());
2504
2505        let cli = Cli {
2506            node,
2507            token: None,
2508            command: Some(Commands::Resume {
2509                name: "test-session".into(),
2510            }),
2511            path: None,
2512        };
2513        let err = execute(&cli).await.unwrap_err();
2514        assert_eq!(err.to_string(), "session is not lost (status: active)");
2515    }
2516
2517    #[tokio::test]
2518    async fn test_execute_spawn_error_response() {
2519        use axum::{Router, http::StatusCode, routing::post};
2520
2521        let app = Router::new().route(
2522            "/api/v1/sessions",
2523            post(|| async {
2524                (
2525                    StatusCode::INTERNAL_SERVER_ERROR,
2526                    "{\"error\":\"failed to spawn session\"}",
2527                )
2528            }),
2529        );
2530        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2531        let addr = listener.local_addr().unwrap();
2532        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2533        let node = format!("127.0.0.1:{}", addr.port());
2534
2535        let cli = Cli {
2536            node,
2537            token: None,
2538            command: Some(Commands::Spawn {
2539                name: Some("test".into()),
2540                workdir: Some("/tmp/repo".into()),
2541                ink: None,
2542                description: None,
2543                detach: true,
2544                idle_threshold: None,
2545                auto: false,
2546                worktree: false,
2547                worktree_base: None,
2548                runtime: None,
2549                secret: vec![],
2550                command: vec!["test".into()],
2551            }),
2552            path: None,
2553        };
2554        let err = execute(&cli).await.unwrap_err();
2555        assert_eq!(err.to_string(), "failed to spawn session");
2556    }
2557
2558    #[tokio::test]
2559    async fn test_execute_interventions_error_response() {
2560        use axum::{Router, http::StatusCode, routing::get};
2561
2562        let app = Router::new().route(
2563            "/api/v1/sessions/{id}/interventions",
2564            get(|| async {
2565                (
2566                    StatusCode::NOT_FOUND,
2567                    "{\"error\":\"session not found: ghost\"}",
2568                )
2569            }),
2570        );
2571        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2572        let addr = listener.local_addr().unwrap();
2573        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2574        let node = format!("127.0.0.1:{}", addr.port());
2575
2576        let cli = Cli {
2577            node,
2578            token: None,
2579            command: Some(Commands::Interventions {
2580                name: "ghost".into(),
2581            }),
2582            path: None,
2583        };
2584        let err = execute(&cli).await.unwrap_err();
2585        assert_eq!(err.to_string(), "session not found: ghost");
2586    }
2587
2588    #[tokio::test]
2589    async fn test_execute_resume_success() {
2590        let node = start_test_server().await;
2591        let cli = Cli {
2592            node,
2593            token: None,
2594            command: Some(Commands::Resume {
2595                name: "test-session".into(),
2596            }),
2597            path: None,
2598        };
2599        let result = execute(&cli).await.unwrap();
2600        assert!(result.contains("Detached from session"));
2601    }
2602
2603    #[tokio::test]
2604    async fn test_execute_input_success() {
2605        let node = start_test_server().await;
2606        let cli = Cli {
2607            node,
2608            token: None,
2609            command: Some(Commands::Input {
2610                name: "test-session".into(),
2611                text: Some("yes".into()),
2612            }),
2613            path: None,
2614        };
2615        let result = execute(&cli).await.unwrap();
2616        assert!(result.contains("Sent input to session test-session"));
2617    }
2618
2619    #[tokio::test]
2620    async fn test_execute_input_no_text() {
2621        let node = start_test_server().await;
2622        let cli = Cli {
2623            node,
2624            token: None,
2625            command: Some(Commands::Input {
2626                name: "test-session".into(),
2627                text: None,
2628            }),
2629            path: None,
2630        };
2631        let result = execute(&cli).await.unwrap();
2632        assert!(result.contains("Sent input to session test-session"));
2633    }
2634
2635    #[tokio::test]
2636    async fn test_execute_input_connection_refused() {
2637        let cli = Cli {
2638            node: "localhost:1".into(),
2639            token: None,
2640            command: Some(Commands::Input {
2641                name: "test".into(),
2642                text: Some("y".into()),
2643            }),
2644            path: None,
2645        };
2646        let result = execute(&cli).await;
2647        let err = result.unwrap_err().to_string();
2648        assert!(err.contains("Could not connect to pulpod"));
2649    }
2650
2651    #[tokio::test]
2652    async fn test_execute_input_error_response() {
2653        use axum::{Router, http::StatusCode, routing::post};
2654
2655        let app = Router::new().route(
2656            "/api/v1/sessions/{id}/input",
2657            post(|| async {
2658                (
2659                    StatusCode::NOT_FOUND,
2660                    "{\"error\":\"session not found: ghost\"}",
2661                )
2662            }),
2663        );
2664        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2665        let addr = listener.local_addr().unwrap();
2666        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2667        let node = format!("127.0.0.1:{}", addr.port());
2668
2669        let cli = Cli {
2670            node,
2671            token: None,
2672            command: Some(Commands::Input {
2673                name: "ghost".into(),
2674                text: Some("y".into()),
2675            }),
2676            path: None,
2677        };
2678        let err = execute(&cli).await.unwrap_err();
2679        assert_eq!(err.to_string(), "session not found: ghost");
2680    }
2681
2682    #[tokio::test]
2683    async fn test_execute_ui() {
2684        let cli = Cli {
2685            node: "localhost:7433".into(),
2686            token: None,
2687            command: Some(Commands::Ui),
2688            path: None,
2689        };
2690        let result = execute(&cli).await.unwrap();
2691        assert!(result.contains("Opening"));
2692        assert!(result.contains("http://localhost:7433"));
2693    }
2694
2695    #[tokio::test]
2696    async fn test_execute_ui_custom_node() {
2697        let cli = Cli {
2698            node: "mac-mini:7433".into(),
2699            token: None,
2700            command: Some(Commands::Ui),
2701            path: None,
2702        };
2703        let result = execute(&cli).await.unwrap();
2704        assert!(result.contains("http://mac-mini:7433"));
2705    }
2706
2707    #[test]
2708    fn test_format_sessions_empty() {
2709        assert_eq!(format_sessions(&[]), "No sessions.");
2710    }
2711
2712    #[test]
2713    fn test_format_sessions_with_data() {
2714        use chrono::Utc;
2715        use pulpo_common::session::SessionStatus;
2716        use uuid::Uuid;
2717
2718        let sessions = vec![Session {
2719            id: Uuid::nil(),
2720            name: "my-api".into(),
2721            workdir: "/tmp/repo".into(),
2722            command: "claude -p 'Fix the bug'".into(),
2723            description: Some("Fix the bug".into()),
2724            status: SessionStatus::Active,
2725            exit_code: None,
2726            backend_session_id: None,
2727            output_snapshot: None,
2728            metadata: None,
2729            ink: None,
2730            intervention_code: None,
2731            intervention_reason: None,
2732            intervention_at: None,
2733            last_output_at: None,
2734            idle_since: None,
2735            idle_threshold_secs: None,
2736            worktree_path: None,
2737            worktree_branch: None,
2738            runtime: Runtime::Tmux,
2739            created_at: Utc::now(),
2740            updated_at: Utc::now(),
2741        }];
2742        let output = format_sessions(&sessions);
2743        assert!(output.contains("ID"));
2744        assert!(output.contains("NAME"));
2745        assert!(output.contains("RUNTIME"));
2746        assert!(output.contains("COMMAND"));
2747        assert!(output.contains("00000000"));
2748        assert!(output.contains("my-api"));
2749        assert!(output.contains("active"));
2750        assert!(output.contains("tmux"));
2751        assert!(output.contains("claude -p 'Fix the bug'"));
2752    }
2753
2754    #[test]
2755    fn test_format_sessions_docker_runtime() {
2756        use chrono::Utc;
2757        use pulpo_common::session::SessionStatus;
2758        use uuid::Uuid;
2759
2760        let sessions = vec![Session {
2761            id: Uuid::nil(),
2762            name: "sandbox-test".into(),
2763            workdir: "/tmp".into(),
2764            command: "claude".into(),
2765            description: None,
2766            status: SessionStatus::Active,
2767            exit_code: None,
2768            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2769            output_snapshot: None,
2770            metadata: None,
2771            ink: None,
2772            intervention_code: None,
2773            intervention_reason: None,
2774            intervention_at: None,
2775            last_output_at: None,
2776            idle_since: None,
2777            idle_threshold_secs: None,
2778            worktree_path: None,
2779            worktree_branch: None,
2780            runtime: Runtime::Docker,
2781            created_at: Utc::now(),
2782            updated_at: Utc::now(),
2783        }];
2784        let output = format_sessions(&sessions);
2785        assert!(output.contains("docker"));
2786    }
2787
2788    #[test]
2789    fn test_format_sessions_long_command_truncated() {
2790        use chrono::Utc;
2791        use pulpo_common::session::SessionStatus;
2792        use uuid::Uuid;
2793
2794        let sessions = vec![Session {
2795            id: Uuid::nil(),
2796            name: "test".into(),
2797            workdir: "/tmp".into(),
2798            command:
2799                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2800                    .into(),
2801            description: None,
2802            status: SessionStatus::Ready,
2803            exit_code: None,
2804            backend_session_id: None,
2805            output_snapshot: None,
2806            metadata: None,
2807            ink: None,
2808            intervention_code: None,
2809            intervention_reason: None,
2810            intervention_at: None,
2811            last_output_at: None,
2812            idle_since: None,
2813            idle_threshold_secs: None,
2814            worktree_path: None,
2815            worktree_branch: None,
2816            runtime: Runtime::Tmux,
2817            created_at: Utc::now(),
2818            updated_at: Utc::now(),
2819        }];
2820        let output = format_sessions(&sessions);
2821        assert!(output.contains("..."));
2822    }
2823
2824    #[test]
2825    fn test_format_sessions_worktree_indicator() {
2826        use chrono::Utc;
2827        use pulpo_common::session::SessionStatus;
2828        use uuid::Uuid;
2829
2830        let sessions = vec![Session {
2831            id: Uuid::nil(),
2832            name: "wt-task".into(),
2833            workdir: "/repo".into(),
2834            command: "claude".into(),
2835            description: None,
2836            status: SessionStatus::Active,
2837            exit_code: None,
2838            backend_session_id: None,
2839            output_snapshot: None,
2840            metadata: None,
2841            ink: None,
2842            intervention_code: None,
2843            intervention_reason: None,
2844            intervention_at: None,
2845            last_output_at: None,
2846            idle_since: None,
2847            idle_threshold_secs: None,
2848            worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
2849            worktree_branch: Some("wt-task".into()),
2850            runtime: Runtime::Tmux,
2851            created_at: Utc::now(),
2852            updated_at: Utc::now(),
2853        }];
2854        let output = format_sessions(&sessions);
2855        assert!(
2856            output.contains("[wt]"),
2857            "should show worktree indicator: {output}"
2858        );
2859        assert!(output.contains("wt-task [wt]"));
2860    }
2861
2862    #[test]
2863    fn test_format_sessions_pr_indicator() {
2864        use chrono::Utc;
2865        use pulpo_common::session::SessionStatus;
2866        use std::collections::HashMap;
2867        use uuid::Uuid;
2868
2869        let mut meta = HashMap::new();
2870        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2871        let sessions = vec![Session {
2872            id: Uuid::nil(),
2873            name: "pr-task".into(),
2874            workdir: "/tmp".into(),
2875            command: "claude".into(),
2876            description: None,
2877            status: SessionStatus::Active,
2878            exit_code: None,
2879            backend_session_id: None,
2880            output_snapshot: None,
2881            metadata: Some(meta),
2882            ink: None,
2883            intervention_code: None,
2884            intervention_reason: None,
2885            intervention_at: None,
2886            last_output_at: None,
2887            idle_since: None,
2888            idle_threshold_secs: None,
2889            worktree_path: None,
2890            worktree_branch: None,
2891            runtime: Runtime::Tmux,
2892            created_at: Utc::now(),
2893            updated_at: Utc::now(),
2894        }];
2895        let output = format_sessions(&sessions);
2896        assert!(
2897            output.contains("[PR]"),
2898            "should show PR indicator: {output}"
2899        );
2900        assert!(output.contains("pr-task [PR]"));
2901    }
2902
2903    #[test]
2904    fn test_format_sessions_worktree_and_pr_indicator() {
2905        use chrono::Utc;
2906        use pulpo_common::session::SessionStatus;
2907        use std::collections::HashMap;
2908        use uuid::Uuid;
2909
2910        let mut meta = HashMap::new();
2911        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2912        let sessions = vec![Session {
2913            id: Uuid::nil(),
2914            name: "both-task".into(),
2915            workdir: "/tmp".into(),
2916            command: "claude".into(),
2917            description: None,
2918            status: SessionStatus::Active,
2919            exit_code: None,
2920            backend_session_id: None,
2921            output_snapshot: None,
2922            metadata: Some(meta),
2923            ink: None,
2924            intervention_code: None,
2925            intervention_reason: None,
2926            intervention_at: None,
2927            last_output_at: None,
2928            idle_since: None,
2929            idle_threshold_secs: None,
2930            worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
2931            worktree_branch: Some("both-task".into()),
2932            runtime: Runtime::Tmux,
2933            created_at: Utc::now(),
2934            updated_at: Utc::now(),
2935        }];
2936        let output = format_sessions(&sessions);
2937        assert!(
2938            output.contains("[wt] [PR]"),
2939            "should show both indicators: {output}"
2940        );
2941    }
2942
2943    #[test]
2944    fn test_format_sessions_no_pr_without_metadata() {
2945        use chrono::Utc;
2946        use pulpo_common::session::SessionStatus;
2947        use uuid::Uuid;
2948
2949        let sessions = vec![Session {
2950            id: Uuid::nil(),
2951            name: "no-pr".into(),
2952            workdir: "/tmp".into(),
2953            command: "claude".into(),
2954            description: None,
2955            status: SessionStatus::Active,
2956            exit_code: None,
2957            backend_session_id: None,
2958            output_snapshot: None,
2959            metadata: None,
2960            ink: None,
2961            intervention_code: None,
2962            intervention_reason: None,
2963            intervention_at: None,
2964            last_output_at: None,
2965            idle_since: None,
2966            idle_threshold_secs: None,
2967            worktree_path: None,
2968            worktree_branch: None,
2969            runtime: Runtime::Tmux,
2970            created_at: Utc::now(),
2971            updated_at: Utc::now(),
2972        }];
2973        let output = format_sessions(&sessions);
2974        assert!(
2975            !output.contains("[PR]"),
2976            "should not show PR indicator: {output}"
2977        );
2978    }
2979
2980    #[test]
2981    fn test_format_nodes() {
2982        use pulpo_common::node::NodeInfo;
2983        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2984
2985        let resp = PeersResponse {
2986            local: NodeInfo {
2987                name: "mac-mini".into(),
2988                hostname: "h".into(),
2989                os: "macos".into(),
2990                arch: "arm64".into(),
2991                cpus: 8,
2992                memory_mb: 16384,
2993                gpu: None,
2994            },
2995            peers: vec![PeerInfo {
2996                name: "win-pc".into(),
2997                address: "win-pc:7433".into(),
2998                status: PeerStatus::Online,
2999                node_info: None,
3000                session_count: Some(3),
3001                source: PeerSource::Configured,
3002            }],
3003        };
3004        let output = format_nodes(&resp);
3005        assert!(output.contains("mac-mini"));
3006        assert!(output.contains("(local)"));
3007        assert!(output.contains("win-pc"));
3008        assert!(output.contains('3'));
3009    }
3010
3011    #[test]
3012    fn test_format_nodes_no_session_count() {
3013        use pulpo_common::node::NodeInfo;
3014        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3015
3016        let resp = PeersResponse {
3017            local: NodeInfo {
3018                name: "local".into(),
3019                hostname: "h".into(),
3020                os: "linux".into(),
3021                arch: "x86_64".into(),
3022                cpus: 4,
3023                memory_mb: 8192,
3024                gpu: None,
3025            },
3026            peers: vec![PeerInfo {
3027                name: "peer".into(),
3028                address: "peer:7433".into(),
3029                status: PeerStatus::Offline,
3030                node_info: None,
3031                session_count: None,
3032                source: PeerSource::Configured,
3033            }],
3034        };
3035        let output = format_nodes(&resp);
3036        assert!(output.contains("offline"));
3037        // No session count → shows "-"
3038        let lines: Vec<&str> = output.lines().collect();
3039        assert!(lines[2].contains('-'));
3040    }
3041
3042    #[tokio::test]
3043    async fn test_execute_resume_connection_refused() {
3044        let cli = Cli {
3045            node: "localhost:1".into(),
3046            token: None,
3047            command: Some(Commands::Resume {
3048                name: "test".into(),
3049            }),
3050            path: None,
3051        };
3052        let result = execute(&cli).await;
3053        let err = result.unwrap_err().to_string();
3054        assert!(err.contains("Could not connect to pulpod"));
3055    }
3056
3057    #[tokio::test]
3058    async fn test_execute_spawn_connection_refused() {
3059        let cli = Cli {
3060            node: "localhost:1".into(),
3061            token: None,
3062            command: Some(Commands::Spawn {
3063                name: Some("test".into()),
3064                workdir: Some("/tmp".into()),
3065                ink: None,
3066                description: None,
3067                detach: true,
3068                idle_threshold: None,
3069                auto: false,
3070                worktree: false,
3071                worktree_base: None,
3072                runtime: None,
3073                secret: vec![],
3074                command: vec!["test".into()],
3075            }),
3076            path: None,
3077        };
3078        let result = execute(&cli).await;
3079        let err = result.unwrap_err().to_string();
3080        assert!(err.contains("Could not connect to pulpod"));
3081    }
3082
3083    #[tokio::test]
3084    async fn test_execute_stop_connection_refused() {
3085        let cli = Cli {
3086            node: "localhost:1".into(),
3087            token: None,
3088            command: Some(Commands::Stop {
3089                names: vec!["test".into()],
3090                purge: false,
3091            }),
3092            path: None,
3093        };
3094        let result = execute(&cli).await;
3095        let err = result.unwrap_err().to_string();
3096        assert!(err.contains("Could not connect to pulpod"));
3097    }
3098
3099    #[tokio::test]
3100    async fn test_execute_logs_connection_refused() {
3101        let cli = Cli {
3102            node: "localhost:1".into(),
3103            token: None,
3104            command: Some(Commands::Logs {
3105                name: "test".into(),
3106                lines: 50,
3107                follow: false,
3108            }),
3109            path: None,
3110        };
3111        let result = execute(&cli).await;
3112        let err = result.unwrap_err().to_string();
3113        assert!(err.contains("Could not connect to pulpod"));
3114    }
3115
3116    #[tokio::test]
3117    async fn test_friendly_error_connect() {
3118        // Make a request to a closed port to get a connect error
3119        let err = reqwest::Client::new()
3120            .get("http://127.0.0.1:1")
3121            .send()
3122            .await
3123            .unwrap_err();
3124        let friendly = friendly_error(&err, "test-node:1");
3125        let msg = friendly.to_string();
3126        assert!(
3127            msg.contains("Could not connect"),
3128            "Expected connect message, got: {msg}"
3129        );
3130    }
3131
3132    #[tokio::test]
3133    async fn test_friendly_error_other() {
3134        // A request to an invalid URL creates a builder error, not a connect error
3135        let err = reqwest::Client::new()
3136            .get("http://[::invalid::url")
3137            .send()
3138            .await
3139            .unwrap_err();
3140        let friendly = friendly_error(&err, "bad-host");
3141        let msg = friendly.to_string();
3142        assert!(
3143            msg.contains("Network error"),
3144            "Expected network error message, got: {msg}"
3145        );
3146        assert!(msg.contains("bad-host"));
3147    }
3148
3149    // -- Auth helper tests --
3150
3151    #[test]
3152    fn test_is_localhost_variants() {
3153        assert!(is_localhost("localhost:7433"));
3154        assert!(is_localhost("127.0.0.1:7433"));
3155        assert!(is_localhost("[::1]:7433"));
3156        assert!(is_localhost("::1"));
3157        assert!(is_localhost("localhost"));
3158        assert!(!is_localhost("mac-mini:7433"));
3159        assert!(!is_localhost("192.168.1.100:7433"));
3160    }
3161
3162    #[test]
3163    fn test_authed_get_with_token() {
3164        let client = reqwest::Client::new();
3165        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3166            .build()
3167            .unwrap();
3168        let auth = req
3169            .headers()
3170            .get("authorization")
3171            .unwrap()
3172            .to_str()
3173            .unwrap();
3174        assert_eq!(auth, "Bearer tok");
3175    }
3176
3177    #[test]
3178    fn test_authed_get_without_token() {
3179        let client = reqwest::Client::new();
3180        let req = authed_get(&client, "http://h:1/api".into(), None)
3181            .build()
3182            .unwrap();
3183        assert!(req.headers().get("authorization").is_none());
3184    }
3185
3186    #[test]
3187    fn test_authed_post_with_token() {
3188        let client = reqwest::Client::new();
3189        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3190            .build()
3191            .unwrap();
3192        let auth = req
3193            .headers()
3194            .get("authorization")
3195            .unwrap()
3196            .to_str()
3197            .unwrap();
3198        assert_eq!(auth, "Bearer secret");
3199    }
3200
3201    #[test]
3202    fn test_authed_post_without_token() {
3203        let client = reqwest::Client::new();
3204        let req = authed_post(&client, "http://h:1/api".into(), None)
3205            .build()
3206            .unwrap();
3207        assert!(req.headers().get("authorization").is_none());
3208    }
3209
3210    #[test]
3211    fn test_authed_delete_with_token() {
3212        let client = reqwest::Client::new();
3213        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3214            .build()
3215            .unwrap();
3216        let auth = req
3217            .headers()
3218            .get("authorization")
3219            .unwrap()
3220            .to_str()
3221            .unwrap();
3222        assert_eq!(auth, "Bearer del-tok");
3223    }
3224
3225    #[test]
3226    fn test_authed_delete_without_token() {
3227        let client = reqwest::Client::new();
3228        let req = authed_delete(&client, "http://h:1/api".into(), None)
3229            .build()
3230            .unwrap();
3231        assert!(req.headers().get("authorization").is_none());
3232    }
3233
3234    #[tokio::test]
3235    async fn test_resolve_token_explicit() {
3236        let client = reqwest::Client::new();
3237        let token =
3238            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3239        assert_eq!(token, Some("my-tok".into()));
3240    }
3241
3242    #[tokio::test]
3243    async fn test_resolve_token_remote_no_explicit() {
3244        let client = reqwest::Client::new();
3245        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3246        assert_eq!(token, None);
3247    }
3248
3249    #[tokio::test]
3250    async fn test_resolve_token_localhost_auto_discover() {
3251        use axum::{Json, Router, routing::get};
3252
3253        let app = Router::new().route(
3254            "/api/v1/auth/token",
3255            get(|| async {
3256                Json(AuthTokenResponse {
3257                    token: "discovered".into(),
3258                })
3259            }),
3260        );
3261        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3262        let addr = listener.local_addr().unwrap();
3263        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3264
3265        let node = format!("localhost:{}", addr.port());
3266        let base = base_url(&node);
3267        let client = reqwest::Client::new();
3268        let token = resolve_token(&client, &base, &node, None).await;
3269        assert_eq!(token, Some("discovered".into()));
3270    }
3271
3272    #[tokio::test]
3273    async fn test_discover_token_empty_returns_none() {
3274        use axum::{Json, Router, routing::get};
3275
3276        let app = Router::new().route(
3277            "/api/v1/auth/token",
3278            get(|| async {
3279                Json(AuthTokenResponse {
3280                    token: String::new(),
3281                })
3282            }),
3283        );
3284        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3285        let addr = listener.local_addr().unwrap();
3286        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3287
3288        let base = format!("http://127.0.0.1:{}", addr.port());
3289        let client = reqwest::Client::new();
3290        assert_eq!(discover_token(&client, &base).await, None);
3291    }
3292
3293    #[tokio::test]
3294    async fn test_discover_token_unreachable_returns_none() {
3295        let client = reqwest::Client::new();
3296        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3297    }
3298
3299    #[test]
3300    fn test_cli_parse_with_token() {
3301        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3302        assert_eq!(cli.token, Some("my-secret".into()));
3303    }
3304
3305    #[test]
3306    fn test_cli_parse_without_token() {
3307        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3308        assert_eq!(cli.token, None);
3309    }
3310
3311    #[tokio::test]
3312    async fn test_execute_with_explicit_token_sends_header() {
3313        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3314
3315        let app = Router::new().route(
3316            "/api/v1/sessions",
3317            get(|req: Request| async move {
3318                let auth = req
3319                    .headers()
3320                    .get("authorization")
3321                    .and_then(|v| v.to_str().ok())
3322                    .unwrap_or("");
3323                assert_eq!(auth, "Bearer test-token");
3324                (StatusCode::OK, "[]".to_owned())
3325            }),
3326        );
3327        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3328        let addr = listener.local_addr().unwrap();
3329        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3330        let node = format!("127.0.0.1:{}", addr.port());
3331
3332        let cli = Cli {
3333            node,
3334            token: Some("test-token".into()),
3335            command: Some(Commands::List { all: false }),
3336            path: None,
3337        };
3338        let result = execute(&cli).await.unwrap();
3339        assert_eq!(result, "No sessions.");
3340    }
3341
3342    // -- Interventions tests --
3343
3344    #[test]
3345    fn test_cli_parse_interventions() {
3346        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3347        assert!(matches!(
3348            &cli.command,
3349            Some(Commands::Interventions { name }) if name == "my-session"
3350        ));
3351    }
3352
3353    #[test]
3354    fn test_format_interventions_empty() {
3355        assert_eq!(format_interventions(&[]), "No intervention events.");
3356    }
3357
3358    #[test]
3359    fn test_format_interventions_with_data() {
3360        let events = vec![
3361            InterventionEventResponse {
3362                id: 1,
3363                session_id: "sess-1".into(),
3364                code: None,
3365                reason: "Memory exceeded threshold".into(),
3366                created_at: "2026-01-01T00:00:00Z".into(),
3367            },
3368            InterventionEventResponse {
3369                id: 2,
3370                session_id: "sess-1".into(),
3371                code: None,
3372                reason: "Idle for 10 minutes".into(),
3373                created_at: "2026-01-02T00:00:00Z".into(),
3374            },
3375        ];
3376        let output = format_interventions(&events);
3377        assert!(output.contains("ID"));
3378        assert!(output.contains("TIMESTAMP"));
3379        assert!(output.contains("REASON"));
3380        assert!(output.contains("Memory exceeded threshold"));
3381        assert!(output.contains("Idle for 10 minutes"));
3382        assert!(output.contains("2026-01-01T00:00:00Z"));
3383    }
3384
3385    #[tokio::test]
3386    async fn test_execute_interventions_empty() {
3387        let node = start_test_server().await;
3388        let cli = Cli {
3389            node,
3390            token: None,
3391            command: Some(Commands::Interventions {
3392                name: "my-session".into(),
3393            }),
3394            path: None,
3395        };
3396        let result = execute(&cli).await.unwrap();
3397        assert_eq!(result, "No intervention events.");
3398    }
3399
3400    #[tokio::test]
3401    async fn test_execute_interventions_with_data() {
3402        use axum::{Router, routing::get};
3403
3404        let app = Router::new().route(
3405            "/api/v1/sessions/{id}/interventions",
3406            get(|| async {
3407                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3408                    .to_owned()
3409            }),
3410        );
3411        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3412        let addr = listener.local_addr().unwrap();
3413        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3414        let node = format!("127.0.0.1:{}", addr.port());
3415
3416        let cli = Cli {
3417            node,
3418            token: None,
3419            command: Some(Commands::Interventions {
3420                name: "test".into(),
3421            }),
3422            path: None,
3423        };
3424        let result = execute(&cli).await.unwrap();
3425        assert!(result.contains("OOM"));
3426        assert!(result.contains("2026-01-01T00:00:00Z"));
3427    }
3428
3429    #[tokio::test]
3430    async fn test_execute_interventions_connection_refused() {
3431        let cli = Cli {
3432            node: "localhost:1".into(),
3433            token: None,
3434            command: Some(Commands::Interventions {
3435                name: "test".into(),
3436            }),
3437            path: None,
3438        };
3439        let result = execute(&cli).await;
3440        let err = result.unwrap_err().to_string();
3441        assert!(err.contains("Could not connect to pulpod"));
3442    }
3443
3444    // -- Attach command tests --
3445
3446    #[test]
3447    fn test_build_attach_command_tmux() {
3448        let cmd = build_attach_command("my-session");
3449        assert_eq!(cmd.get_program(), "tmux");
3450        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3451        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3452    }
3453
3454    #[test]
3455    fn test_build_attach_command_docker() {
3456        let cmd = build_attach_command("docker:pulpo-my-task");
3457        assert_eq!(cmd.get_program(), "docker");
3458        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3459        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3460    }
3461
3462    #[test]
3463    fn test_cli_parse_attach() {
3464        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3465        assert!(matches!(
3466            &cli.command,
3467            Some(Commands::Attach { name }) if name == "my-session"
3468        ));
3469    }
3470
3471    #[test]
3472    fn test_cli_parse_attach_alias() {
3473        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3474        assert!(matches!(
3475            &cli.command,
3476            Some(Commands::Attach { name }) if name == "my-session"
3477        ));
3478    }
3479
3480    #[tokio::test]
3481    async fn test_execute_attach_success() {
3482        let node = start_test_server().await;
3483        let cli = Cli {
3484            node,
3485            token: None,
3486            command: Some(Commands::Attach {
3487                name: "test-session".into(),
3488            }),
3489            path: None,
3490        };
3491        let result = execute(&cli).await.unwrap();
3492        assert!(result.contains("Detached from session test-session"));
3493    }
3494
3495    #[tokio::test]
3496    async fn test_execute_attach_with_backend_session_id() {
3497        use axum::{Router, routing::get};
3498        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3499        let app = Router::new().route(
3500            "/api/v1/sessions/{id}",
3501            get(move || async move { session_json.to_owned() }),
3502        );
3503        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3504        let addr = listener.local_addr().unwrap();
3505        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3506
3507        let cli = Cli {
3508            node: format!("127.0.0.1:{}", addr.port()),
3509            token: None,
3510            command: Some(Commands::Attach {
3511                name: "my-session".into(),
3512            }),
3513            path: None,
3514        };
3515        let result = execute(&cli).await.unwrap();
3516        assert!(result.contains("Detached from session my-session"));
3517    }
3518
3519    #[tokio::test]
3520    async fn test_execute_attach_connection_refused() {
3521        let cli = Cli {
3522            node: "localhost:1".into(),
3523            token: None,
3524            command: Some(Commands::Attach {
3525                name: "test-session".into(),
3526            }),
3527            path: None,
3528        };
3529        let result = execute(&cli).await;
3530        let err = result.unwrap_err().to_string();
3531        assert!(err.contains("Could not connect to pulpod"));
3532    }
3533
3534    #[tokio::test]
3535    async fn test_execute_attach_error_response() {
3536        use axum::{Router, http::StatusCode, routing::get};
3537        let app = Router::new().route(
3538            "/api/v1/sessions/{id}",
3539            get(|| async {
3540                (
3541                    StatusCode::NOT_FOUND,
3542                    r#"{"error":"session not found"}"#.to_owned(),
3543                )
3544            }),
3545        );
3546        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3547        let addr = listener.local_addr().unwrap();
3548        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3549
3550        let cli = Cli {
3551            node: format!("127.0.0.1:{}", addr.port()),
3552            token: None,
3553            command: Some(Commands::Attach {
3554                name: "nonexistent".into(),
3555            }),
3556            path: None,
3557        };
3558        let result = execute(&cli).await;
3559        let err = result.unwrap_err().to_string();
3560        assert!(err.contains("session not found"));
3561    }
3562
3563    #[tokio::test]
3564    async fn test_execute_attach_stale_session() {
3565        use axum::{Router, routing::get};
3566        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3567        let app = Router::new().route(
3568            "/api/v1/sessions/{id}",
3569            get(move || async move { session_json.to_owned() }),
3570        );
3571        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3572        let addr = listener.local_addr().unwrap();
3573        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3574
3575        let cli = Cli {
3576            node: format!("127.0.0.1:{}", addr.port()),
3577            token: None,
3578            command: Some(Commands::Attach {
3579                name: "stale-sess".into(),
3580            }),
3581            path: None,
3582        };
3583        let result = execute(&cli).await;
3584        let err = result.unwrap_err().to_string();
3585        assert!(err.contains("lost"));
3586        assert!(err.contains("pulpo resume"));
3587    }
3588
3589    #[tokio::test]
3590    async fn test_execute_attach_dead_session() {
3591        use axum::{Router, routing::get};
3592        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3593        let app = Router::new().route(
3594            "/api/v1/sessions/{id}",
3595            get(move || async move { session_json.to_owned() }),
3596        );
3597        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3598        let addr = listener.local_addr().unwrap();
3599        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3600
3601        let cli = Cli {
3602            node: format!("127.0.0.1:{}", addr.port()),
3603            token: None,
3604            command: Some(Commands::Attach {
3605                name: "dead-sess".into(),
3606            }),
3607            path: None,
3608        };
3609        let result = execute(&cli).await;
3610        let err = result.unwrap_err().to_string();
3611        assert!(err.contains("stopped"));
3612        assert!(err.contains("cannot attach"));
3613    }
3614
3615    // -- Alias parse tests --
3616
3617    #[test]
3618    fn test_cli_parse_alias_spawn() {
3619        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3620        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3621    }
3622
3623    #[test]
3624    fn test_cli_parse_alias_list() {
3625        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3626        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3627    }
3628
3629    #[test]
3630    fn test_cli_parse_list_all() {
3631        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3632        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3633
3634        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3635        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3636    }
3637
3638    #[test]
3639    fn test_cli_parse_alias_logs() {
3640        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3641        assert!(matches!(
3642            &cli.command,
3643            Some(Commands::Logs { name, .. }) if name == "my-session"
3644        ));
3645    }
3646
3647    #[test]
3648    fn test_cli_parse_alias_stop() {
3649        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3650        assert!(matches!(
3651            &cli.command,
3652            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
3653        ));
3654    }
3655
3656    #[test]
3657    fn test_cli_parse_alias_resume() {
3658        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3659        assert!(matches!(
3660            &cli.command,
3661            Some(Commands::Resume { name }) if name == "my-session"
3662        ));
3663    }
3664
3665    #[test]
3666    fn test_cli_parse_alias_nodes() {
3667        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3668        assert!(matches!(&cli.command, Some(Commands::Nodes)));
3669    }
3670
3671    #[test]
3672    fn test_cli_parse_alias_interventions() {
3673        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3674        assert!(matches!(
3675            &cli.command,
3676            Some(Commands::Interventions { name }) if name == "my-session"
3677        ));
3678    }
3679
3680    #[test]
3681    fn test_api_error_json() {
3682        let err = api_error("{\"error\":\"session not found: foo\"}");
3683        assert_eq!(err.to_string(), "session not found: foo");
3684    }
3685
3686    #[test]
3687    fn test_api_error_plain_text() {
3688        let err = api_error("plain text error");
3689        assert_eq!(err.to_string(), "plain text error");
3690    }
3691
3692    // -- diff_output tests --
3693
3694    #[test]
3695    fn test_diff_output_empty_prev() {
3696        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3697    }
3698
3699    #[test]
3700    fn test_diff_output_identical() {
3701        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3702    }
3703
3704    #[test]
3705    fn test_diff_output_new_lines_appended() {
3706        let prev = "line1\nline2";
3707        let new = "line1\nline2\nline3\nline4";
3708        assert_eq!(diff_output(prev, new), "line3\nline4");
3709    }
3710
3711    #[test]
3712    fn test_diff_output_scrolled_window() {
3713        // Window of 3 lines: old lines scroll off top, new appear at bottom
3714        let prev = "line1\nline2\nline3";
3715        let new = "line2\nline3\nline4";
3716        assert_eq!(diff_output(prev, new), "line4");
3717    }
3718
3719    #[test]
3720    fn test_diff_output_completely_different() {
3721        let prev = "aaa\nbbb";
3722        let new = "xxx\nyyy";
3723        assert_eq!(diff_output(prev, new), "xxx\nyyy");
3724    }
3725
3726    #[test]
3727    fn test_diff_output_last_line_matches_but_overlap_fails() {
3728        // Last line of prev appears in new but preceding lines don't match
3729        let prev = "aaa\ncommon";
3730        let new = "zzz\ncommon\nnew_line";
3731        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
3732        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
3733        // Falls through, no verified overlap, so returns everything
3734        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3735    }
3736
3737    #[test]
3738    fn test_diff_output_new_empty() {
3739        assert_eq!(diff_output("line1", ""), "");
3740    }
3741
3742    // -- follow_logs tests --
3743
3744    /// Start a test server that simulates evolving output and session status transitions.
3745    /// Start a test server that simulates evolving output with agent exit marker.
3746    async fn start_follow_test_server() -> String {
3747        use axum::{Router, extract::Path, extract::Query, routing::get};
3748        use std::sync::Arc;
3749        use std::sync::atomic::{AtomicUsize, Ordering};
3750
3751        let call_count = Arc::new(AtomicUsize::new(0));
3752        let output_count = call_count.clone();
3753
3754        let app = Router::new()
3755            .route(
3756                "/api/v1/sessions/{id}/output",
3757                get(
3758                    move |_path: Path<String>,
3759                          _query: Query<std::collections::HashMap<String, String>>| {
3760                        let count = output_count.clone();
3761                        async move {
3762                            let n = count.fetch_add(1, Ordering::SeqCst);
3763                            let output = match n {
3764                                0 => "line1\nline2".to_owned(),
3765                                1 => "line1\nline2\nline3".to_owned(),
3766                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3767                            };
3768                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3769                        }
3770                    },
3771                ),
3772            )
3773            .route(
3774                "/api/v1/sessions/{id}",
3775                get(|_path: Path<String>| async {
3776                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3777                }),
3778            );
3779
3780        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3781        let addr = listener.local_addr().unwrap();
3782        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3783        format!("http://127.0.0.1:{}", addr.port())
3784    }
3785
3786    #[tokio::test]
3787    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3788        let base = start_follow_test_server().await;
3789        let client = reqwest::Client::new();
3790        let mut buf = Vec::new();
3791
3792        follow_logs(&client, &base, "test", 100, None, &mut buf)
3793            .await
3794            .unwrap();
3795
3796        let output = String::from_utf8(buf).unwrap();
3797        // Should contain initial output + new lines + agent exit marker
3798        assert!(output.contains("line1"));
3799        assert!(output.contains("line2"));
3800        assert!(output.contains("line3"));
3801        assert!(output.contains("line4"));
3802        assert!(output.contains("[pulpo] Agent exited"));
3803    }
3804
3805    #[tokio::test]
3806    async fn test_execute_logs_follow_success() {
3807        let base = start_follow_test_server().await;
3808        // Extract host:port from http://127.0.0.1:PORT
3809        let node = base.strip_prefix("http://").unwrap().to_owned();
3810
3811        let cli = Cli {
3812            node,
3813            token: None,
3814            command: Some(Commands::Logs {
3815                name: "test".into(),
3816                lines: 100,
3817                follow: true,
3818            }),
3819            path: None,
3820        };
3821        // execute() with follow writes to stdout and returns empty string
3822        let result = execute(&cli).await.unwrap();
3823        assert_eq!(result, "");
3824    }
3825
3826    #[tokio::test]
3827    async fn test_execute_logs_follow_connection_refused() {
3828        let cli = Cli {
3829            node: "localhost:1".into(),
3830            token: None,
3831            command: Some(Commands::Logs {
3832                name: "test".into(),
3833                lines: 50,
3834                follow: true,
3835            }),
3836            path: None,
3837        };
3838        let result = execute(&cli).await;
3839        let err = result.unwrap_err().to_string();
3840        assert!(
3841            err.contains("Could not connect to pulpod"),
3842            "Expected friendly error, got: {err}"
3843        );
3844    }
3845
3846    #[tokio::test]
3847    async fn test_follow_logs_exits_on_dead() {
3848        use axum::{Router, extract::Path, extract::Query, routing::get};
3849
3850        let app = Router::new()
3851            .route(
3852                "/api/v1/sessions/{id}/output",
3853                get(
3854                    |_path: Path<String>,
3855                     _query: Query<std::collections::HashMap<String, String>>| async {
3856                        r#"{"output":"some output"}"#.to_owned()
3857                    },
3858                ),
3859            )
3860            .route(
3861                "/api/v1/sessions/{id}",
3862                get(|_path: Path<String>| async {
3863                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3864                }),
3865            );
3866
3867        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3868        let addr = listener.local_addr().unwrap();
3869        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3870        let base = format!("http://127.0.0.1:{}", addr.port());
3871
3872        let client = reqwest::Client::new();
3873        let mut buf = Vec::new();
3874        follow_logs(&client, &base, "test", 100, None, &mut buf)
3875            .await
3876            .unwrap();
3877
3878        let output = String::from_utf8(buf).unwrap();
3879        assert!(output.contains("some output"));
3880    }
3881
3882    #[tokio::test]
3883    async fn test_follow_logs_exits_on_stale() {
3884        use axum::{Router, extract::Path, extract::Query, routing::get};
3885
3886        let app = Router::new()
3887            .route(
3888                "/api/v1/sessions/{id}/output",
3889                get(
3890                    |_path: Path<String>,
3891                     _query: Query<std::collections::HashMap<String, String>>| async {
3892                        r#"{"output":"stale output"}"#.to_owned()
3893                    },
3894                ),
3895            )
3896            .route(
3897                "/api/v1/sessions/{id}",
3898                get(|_path: Path<String>| async {
3899                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3900                }),
3901            );
3902
3903        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3904        let addr = listener.local_addr().unwrap();
3905        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3906        let base = format!("http://127.0.0.1:{}", addr.port());
3907
3908        let client = reqwest::Client::new();
3909        let mut buf = Vec::new();
3910        follow_logs(&client, &base, "test", 100, None, &mut buf)
3911            .await
3912            .unwrap();
3913
3914        let output = String::from_utf8(buf).unwrap();
3915        assert!(output.contains("stale output"));
3916    }
3917
3918    #[tokio::test]
3919    async fn test_execute_logs_follow_non_reqwest_error() {
3920        use axum::{Router, extract::Path, extract::Query, routing::get};
3921
3922        // Session status endpoint returns invalid JSON to trigger a serde error
3923        let app = Router::new()
3924            .route(
3925                "/api/v1/sessions/{id}/output",
3926                get(
3927                    |_path: Path<String>,
3928                     _query: Query<std::collections::HashMap<String, String>>| async {
3929                        r#"{"output":"initial"}"#.to_owned()
3930                    },
3931                ),
3932            )
3933            .route(
3934                "/api/v1/sessions/{id}",
3935                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3936            );
3937
3938        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3939        let addr = listener.local_addr().unwrap();
3940        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3941        let node = format!("127.0.0.1:{}", addr.port());
3942
3943        let cli = Cli {
3944            node,
3945            token: None,
3946            command: Some(Commands::Logs {
3947                name: "test".into(),
3948                lines: 100,
3949                follow: true,
3950            }),
3951            path: None,
3952        };
3953        let err = execute(&cli).await.unwrap_err();
3954        // serde_json error, not a reqwest error — hits the Err(other) branch
3955        let msg = err.to_string();
3956        assert!(
3957            msg.contains("expected ident"),
3958            "Expected serde parse error, got: {msg}"
3959        );
3960    }
3961
3962    #[tokio::test]
3963    async fn test_fetch_session_status_connection_error() {
3964        let client = reqwest::Client::new();
3965        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3966        assert!(result.is_err());
3967    }
3968
3969    // -- Schedule tests --
3970
3971    #[test]
3972    fn test_format_schedules_empty() {
3973        assert_eq!(format_schedules(&[]), "No schedules.");
3974    }
3975
3976    #[test]
3977    fn test_format_schedules_with_entries() {
3978        let schedules = vec![serde_json::json!({
3979            "name": "nightly",
3980            "cron": "0 3 * * *",
3981            "enabled": true,
3982            "last_run_at": null,
3983            "target_node": null
3984        })];
3985        let output = format_schedules(&schedules);
3986        assert!(output.contains("nightly"));
3987        assert!(output.contains("0 3 * * *"));
3988        assert!(output.contains("local"));
3989        assert!(output.contains("yes"));
3990        assert!(output.contains('-'));
3991    }
3992
3993    #[test]
3994    fn test_format_schedules_disabled_entry() {
3995        let schedules = vec![serde_json::json!({
3996            "name": "weekly",
3997            "cron": "0 0 * * 0",
3998            "enabled": false,
3999            "last_run_at": "2026-03-18T03:00:00Z",
4000            "target_node": "gpu-box"
4001        })];
4002        let output = format_schedules(&schedules);
4003        assert!(output.contains("weekly"));
4004        assert!(output.contains("no"));
4005        assert!(output.contains("gpu-box"));
4006        assert!(output.contains("2026-03-18T03:00"));
4007    }
4008
4009    #[test]
4010    fn test_format_schedules_header() {
4011        let schedules = vec![serde_json::json!({
4012            "name": "test",
4013            "cron": "* * * * *",
4014            "enabled": true,
4015            "last_run_at": null,
4016            "target_node": null
4017        })];
4018        let output = format_schedules(&schedules);
4019        assert!(output.contains("NAME"));
4020        assert!(output.contains("CRON"));
4021        assert!(output.contains("ENABLED"));
4022        assert!(output.contains("LAST RUN"));
4023        assert!(output.contains("NODE"));
4024    }
4025
4026    // -- Schedule CLI parse tests --
4027
4028    #[test]
4029    fn test_cli_parse_schedule_add() {
4030        let cli = Cli::try_parse_from([
4031            "pulpo",
4032            "schedule",
4033            "add",
4034            "nightly",
4035            "0 3 * * *",
4036            "--workdir",
4037            "/repo",
4038            "--",
4039            "claude",
4040            "-p",
4041            "review",
4042        ])
4043        .unwrap();
4044        assert!(matches!(
4045            &cli.command,
4046            Some(Commands::Schedule {
4047                action: ScheduleAction::Add { name, cron, .. }
4048            }) if name == "nightly" && cron == "0 3 * * *"
4049        ));
4050    }
4051
4052    #[test]
4053    fn test_cli_parse_schedule_add_with_node() {
4054        let cli = Cli::try_parse_from([
4055            "pulpo",
4056            "schedule",
4057            "add",
4058            "nightly",
4059            "0 3 * * *",
4060            "--workdir",
4061            "/repo",
4062            "--node",
4063            "gpu-box",
4064            "--",
4065            "claude",
4066        ])
4067        .unwrap();
4068        assert!(matches!(
4069            &cli.command,
4070            Some(Commands::Schedule {
4071                action: ScheduleAction::Add { node, .. }
4072            }) if node.as_deref() == Some("gpu-box")
4073        ));
4074    }
4075
4076    #[test]
4077    fn test_cli_parse_schedule_add_install_alias() {
4078        let cli =
4079            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4080        assert!(matches!(
4081            &cli.command,
4082            Some(Commands::Schedule {
4083                action: ScheduleAction::Add { name, .. }
4084            }) if name == "nightly"
4085        ));
4086    }
4087
4088    #[test]
4089    fn test_cli_parse_schedule_list() {
4090        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4091        assert!(matches!(
4092            &cli.command,
4093            Some(Commands::Schedule {
4094                action: ScheduleAction::List
4095            })
4096        ));
4097    }
4098
4099    #[test]
4100    fn test_cli_parse_schedule_remove() {
4101        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4102        assert!(matches!(
4103            &cli.command,
4104            Some(Commands::Schedule {
4105                action: ScheduleAction::Remove { name }
4106            }) if name == "nightly"
4107        ));
4108    }
4109
4110    #[test]
4111    fn test_cli_parse_schedule_pause() {
4112        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4113        assert!(matches!(
4114            &cli.command,
4115            Some(Commands::Schedule {
4116                action: ScheduleAction::Pause { name }
4117            }) if name == "nightly"
4118        ));
4119    }
4120
4121    #[test]
4122    fn test_cli_parse_schedule_resume() {
4123        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4124        assert!(matches!(
4125            &cli.command,
4126            Some(Commands::Schedule {
4127                action: ScheduleAction::Resume { name }
4128            }) if name == "nightly"
4129        ));
4130    }
4131
4132    #[test]
4133    fn test_cli_parse_schedule_alias() {
4134        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4135        assert!(matches!(
4136            &cli.command,
4137            Some(Commands::Schedule {
4138                action: ScheduleAction::List
4139            })
4140        ));
4141    }
4142
4143    #[test]
4144    fn test_cli_parse_schedule_list_alias() {
4145        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4146        assert!(matches!(
4147            &cli.command,
4148            Some(Commands::Schedule {
4149                action: ScheduleAction::List
4150            })
4151        ));
4152    }
4153
4154    #[test]
4155    fn test_cli_parse_schedule_remove_alias() {
4156        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4157        assert!(matches!(
4158            &cli.command,
4159            Some(Commands::Schedule {
4160                action: ScheduleAction::Remove { name }
4161            }) if name == "nightly"
4162        ));
4163    }
4164
4165    #[tokio::test]
4166    async fn test_execute_schedule_list_via_execute() {
4167        let node = start_test_server().await;
4168        let cli = Cli {
4169            node,
4170            token: None,
4171            command: Some(Commands::Schedule {
4172                action: ScheduleAction::List,
4173            }),
4174            path: None,
4175        };
4176        let result = execute(&cli).await.unwrap();
4177        // Under coverage, execute_schedule is a stub that returns empty string
4178        #[cfg(coverage)]
4179        assert!(result.is_empty());
4180        #[cfg(not(coverage))]
4181        assert_eq!(result, "No schedules.");
4182    }
4183
4184    #[test]
4185    fn test_schedule_action_debug() {
4186        let action = ScheduleAction::List;
4187        assert_eq!(format!("{action:?}"), "List");
4188    }
4189
4190    #[test]
4191    fn test_cli_parse_send_alias() {
4192        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4193        assert!(matches!(
4194            &cli.command,
4195            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4196        ));
4197    }
4198
4199    #[test]
4200    fn test_cli_parse_spawn_no_name() {
4201        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4202        assert!(matches!(
4203            &cli.command,
4204            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4205        ));
4206    }
4207
4208    #[test]
4209    fn test_cli_parse_spawn_optional_name_with_command() {
4210        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4211        assert!(matches!(
4212            &cli.command,
4213            Some(Commands::Spawn { name, command, .. })
4214                if name.is_none() && command == &["echo", "hello"]
4215        ));
4216    }
4217
4218    #[test]
4219    fn test_cli_parse_path_shortcut() {
4220        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4221        assert!(cli.command.is_none());
4222        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4223    }
4224
4225    #[test]
4226    fn test_cli_parse_no_args() {
4227        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4228        assert!(cli.command.is_none());
4229        assert!(cli.path.is_none());
4230    }
4231
4232    #[test]
4233    fn test_derive_session_name_simple() {
4234        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4235    }
4236
4237    #[test]
4238    fn test_derive_session_name_with_special_chars() {
4239        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4240    }
4241
4242    #[test]
4243    fn test_derive_session_name_root() {
4244        assert_eq!(derive_session_name("/"), "session");
4245    }
4246
4247    #[test]
4248    fn test_derive_session_name_dots() {
4249        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4250    }
4251
4252    #[test]
4253    fn test_resolve_path_absolute() {
4254        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4255    }
4256
4257    #[test]
4258    fn test_resolve_path_relative() {
4259        let resolved = resolve_path("my-repo");
4260        assert!(resolved.ends_with("my-repo"));
4261        assert!(resolved.starts_with('/'));
4262    }
4263
4264    #[tokio::test]
4265    async fn test_execute_no_args_shows_help() {
4266        let node = start_test_server().await;
4267        let cli = Cli {
4268            node,
4269            token: None,
4270            path: None,
4271            command: None,
4272        };
4273        let result = execute(&cli).await.unwrap();
4274        assert!(
4275            result.is_empty(),
4276            "no-args should return empty string after printing help"
4277        );
4278    }
4279
4280    #[tokio::test]
4281    async fn test_execute_path_shortcut() {
4282        let node = start_test_server().await;
4283        let cli = Cli {
4284            node,
4285            token: None,
4286            path: Some("/tmp".into()),
4287            command: None,
4288        };
4289        let result = execute(&cli).await.unwrap();
4290        assert!(result.contains("Detached from session"));
4291    }
4292
4293    #[tokio::test]
4294    async fn test_deduplicate_session_name_no_conflict() {
4295        // Connection refused → falls through to "name not taken" path
4296        let base = "http://127.0.0.1:1";
4297        let client = reqwest::Client::new();
4298        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4299        assert_eq!(name, "fresh");
4300    }
4301
4302    #[tokio::test]
4303    async fn test_deduplicate_session_name_with_conflict() {
4304        use axum::{Router, routing::get};
4305        use std::sync::atomic::{AtomicU32, Ordering};
4306
4307        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4308        let counter = call_count.clone();
4309        let app = Router::new()
4310            .route(
4311                "/api/v1/sessions/{id}",
4312                get(move || {
4313                    let c = counter.clone();
4314                    async move {
4315                        let n = c.fetch_add(1, Ordering::SeqCst);
4316                        if n == 0 {
4317                            // First call (base name) → exists
4318                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4319                        } else {
4320                            // Suffixed name → not found
4321                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4322                        }
4323                    }
4324                }),
4325            )
4326            .route(
4327                "/api/v1/peers",
4328                get(|| async {
4329                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4330                }),
4331            );
4332        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4333        let addr = listener.local_addr().unwrap();
4334        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4335        let base = format!("http://127.0.0.1:{}", addr.port());
4336        let client = reqwest::Client::new();
4337        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4338        assert_eq!(name, "repo-2");
4339    }
4340
4341    // -- Node resolution tests --
4342
4343    #[test]
4344    fn test_node_needs_resolution() {
4345        assert!(!node_needs_resolution("localhost:7433"));
4346        assert!(!node_needs_resolution("mac-mini:7433"));
4347        assert!(!node_needs_resolution("10.0.0.1:7433"));
4348        assert!(!node_needs_resolution("[::1]:7433"));
4349        assert!(node_needs_resolution("mac-mini"));
4350        assert!(node_needs_resolution("linux-server"));
4351        assert!(node_needs_resolution("localhost"));
4352    }
4353
4354    #[tokio::test]
4355    async fn test_resolve_node_with_port() {
4356        let client = reqwest::Client::new();
4357        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4358        assert_eq!(addr, "mac-mini:7433");
4359        assert!(token.is_none());
4360    }
4361
4362    #[tokio::test]
4363    async fn test_resolve_node_fallback_appends_port() {
4364        // No local daemon running on localhost:7433, so peer lookup fails
4365        // and it falls back to appending :7433
4366        let client = reqwest::Client::new();
4367        let (addr, token) = resolve_node(&client, "unknown-host").await;
4368        assert_eq!(addr, "unknown-host:7433");
4369        assert!(token.is_none());
4370    }
4371
4372    #[cfg(not(coverage))]
4373    #[tokio::test]
4374    async fn test_resolve_node_finds_peer() {
4375        use axum::{Router, routing::get};
4376
4377        let app = Router::new()
4378            .route(
4379                "/api/v1/peers",
4380                get(|| async {
4381                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4382                }),
4383            )
4384            .route(
4385                "/api/v1/config",
4386                get(|| async {
4387                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4388                }),
4389            );
4390
4391        // Port 7433 may be in use; skip test if so
4392        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4393            return;
4394        };
4395        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4396
4397        let client = reqwest::Client::new();
4398        let (addr, token) = resolve_node(&client, "mac-mini").await;
4399        assert_eq!(addr, "10.0.0.5:7433");
4400        assert_eq!(token, Some("peer-secret".into()));
4401    }
4402
4403    #[cfg(not(coverage))]
4404    #[tokio::test]
4405    async fn test_resolve_node_peer_no_token() {
4406        use axum::{Router, routing::get};
4407
4408        let app = Router::new()
4409            .route(
4410                "/api/v1/peers",
4411                get(|| async {
4412                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4413                }),
4414            )
4415            .route(
4416                "/api/v1/config",
4417                get(|| async {
4418                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4419                }),
4420            );
4421
4422        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4423            return; // Port in use, skip
4424        };
4425        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4426
4427        let client = reqwest::Client::new();
4428        let (addr, token) = resolve_node(&client, "test-peer").await;
4429        assert_eq!(addr, "10.0.0.9:7433");
4430        assert!(token.is_none()); // Simple peer entry has no token
4431    }
4432
4433    #[tokio::test]
4434    async fn test_execute_with_peer_name_resolution() {
4435        // When node doesn't contain ':', resolve_node is called.
4436        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4437        // The connection to the fallback address will fail, giving us a connection error.
4438        let cli = Cli {
4439            node: "nonexistent-peer".into(),
4440            token: None,
4441            command: Some(Commands::List { all: false }),
4442            path: None,
4443        };
4444        let result = execute(&cli).await;
4445        // Should try to connect to nonexistent-peer:7433 and fail
4446        assert!(result.is_err());
4447    }
4448
4449    // -- Auto node selection tests --
4450
4451    #[test]
4452    fn test_cli_parse_spawn_auto() {
4453        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4454        assert!(matches!(
4455            &cli.command,
4456            Some(Commands::Spawn { auto, .. }) if *auto
4457        ));
4458    }
4459
4460    #[test]
4461    fn test_cli_parse_spawn_auto_default() {
4462        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4463        assert!(matches!(
4464            &cli.command,
4465            Some(Commands::Spawn { auto, .. }) if !auto
4466        ));
4467    }
4468
4469    #[tokio::test]
4470    async fn test_select_best_node_coverage_stub() {
4471        // Exercise the coverage stub (or real function in non-coverage builds)
4472        let client = reqwest::Client::new();
4473        // In coverage builds, the stub returns ("localhost:7433", "local")
4474        // In non-coverage builds, this fails because no server is running — that's OK
4475        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4476    }
4477
4478    #[cfg(not(coverage))]
4479    #[tokio::test]
4480    async fn test_select_best_node_picks_least_loaded() {
4481        use axum::{Router, routing::get};
4482
4483        let app = Router::new().route(
4484            "/api/v1/peers",
4485            get(|| async {
4486                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4487            }),
4488        );
4489        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4490        let addr = listener.local_addr().unwrap();
4491        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4492        let base = format!("http://127.0.0.1:{}", addr.port());
4493
4494        let client = reqwest::Client::new();
4495        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4496        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4497        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4498        // idle wins (lower score)
4499        assert_eq!(name, "idle");
4500        assert_eq!(addr, "idle:7433");
4501    }
4502
4503    #[cfg(not(coverage))]
4504    #[tokio::test]
4505    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4506        use axum::{Router, routing::get};
4507
4508        let app = Router::new().route(
4509            "/api/v1/peers",
4510            get(|| async {
4511                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4512            }),
4513        );
4514        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4515        let addr = listener.local_addr().unwrap();
4516        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4517        let base = format!("http://127.0.0.1:{}", addr.port());
4518
4519        let client = reqwest::Client::new();
4520        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4521        assert_eq!(name, "my-mac");
4522        assert_eq!(addr, "localhost:7433");
4523    }
4524
4525    #[cfg(not(coverage))]
4526    #[tokio::test]
4527    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4528        use axum::{Router, routing::get};
4529
4530        let app = Router::new().route(
4531            "/api/v1/peers",
4532            get(|| async {
4533                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4534            }),
4535        );
4536        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4537        let addr = listener.local_addr().unwrap();
4538        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4539        let base = format!("http://127.0.0.1:{}", addr.port());
4540
4541        let client = reqwest::Client::new();
4542        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4543        assert_eq!(name, "solo");
4544        assert_eq!(addr, "localhost:7433");
4545    }
4546
4547    #[cfg(not(coverage))]
4548    #[tokio::test]
4549    async fn test_execute_spawn_auto_selects_node() {
4550        use axum::{
4551            Router,
4552            http::StatusCode,
4553            routing::{get, post},
4554        };
4555
4556        let create_json = test_create_response_json();
4557
4558        // Bind early so we know the address to embed in the peers response
4559        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4560        let addr = listener.local_addr().unwrap();
4561        let node = format!("127.0.0.1:{}", addr.port());
4562        let peer_addr = node.clone();
4563
4564        let app = Router::new()
4565            .route(
4566                "/api/v1/peers",
4567                get(move || {
4568                    let peer_addr = peer_addr.clone();
4569                    async move {
4570                        format!(
4571                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4572                        )
4573                    }
4574                }),
4575            )
4576            .route(
4577                "/api/v1/sessions",
4578                post(move || async move {
4579                    (StatusCode::CREATED, create_json.clone())
4580                }),
4581            );
4582
4583        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4584
4585        let cli = Cli {
4586            node,
4587            token: None,
4588            command: Some(Commands::Spawn {
4589                name: Some("test".into()),
4590                workdir: Some("/tmp/repo".into()),
4591                ink: None,
4592                description: None,
4593                detach: true,
4594                idle_threshold: None,
4595                auto: true,
4596                worktree: false,
4597                worktree_base: None,
4598                runtime: None,
4599                secret: vec![],
4600                command: vec!["echo".into(), "hello".into()],
4601            }),
4602            path: None,
4603        };
4604        let result = execute(&cli).await.unwrap();
4605        assert!(result.contains("Created session"));
4606    }
4607
4608    #[cfg(not(coverage))]
4609    #[tokio::test]
4610    async fn test_select_best_node_peer_no_session_count() {
4611        use axum::{Router, routing::get};
4612
4613        let app = Router::new().route(
4614            "/api/v1/peers",
4615            get(|| async {
4616                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4617            }),
4618        );
4619        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4620        let addr = listener.local_addr().unwrap();
4621        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4622        let base = format!("http://127.0.0.1:{}", addr.port());
4623
4624        let client = reqwest::Client::new();
4625        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4626        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4627        assert_eq!(name, "fresh");
4628        assert_eq!(addr, "fresh:7433");
4629    }
4630
4631    // -- Secret CLI parse tests --
4632
4633    #[test]
4634    fn test_cli_parse_secret_set() {
4635        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4636        assert!(matches!(
4637            &cli.command,
4638            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4639                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4640        ));
4641    }
4642
4643    #[test]
4644    fn test_cli_parse_secret_list() {
4645        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4646        assert!(matches!(
4647            &cli.command,
4648            Some(Commands::Secret {
4649                action: SecretAction::List
4650            })
4651        ));
4652    }
4653
4654    #[test]
4655    fn test_cli_parse_secret_list_alias() {
4656        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4657        assert!(matches!(
4658            &cli.command,
4659            Some(Commands::Secret {
4660                action: SecretAction::List
4661            })
4662        ));
4663    }
4664
4665    #[test]
4666    fn test_cli_parse_secret_delete() {
4667        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4668        assert!(matches!(
4669            &cli.command,
4670            Some(Commands::Secret { action: SecretAction::Delete { name } })
4671                if name == "MY_TOKEN"
4672        ));
4673    }
4674
4675    #[test]
4676    fn test_cli_parse_secret_delete_alias() {
4677        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4678        assert!(matches!(
4679            &cli.command,
4680            Some(Commands::Secret { action: SecretAction::Delete { name } })
4681                if name == "MY_TOKEN"
4682        ));
4683    }
4684
4685    #[test]
4686    fn test_cli_parse_secret_alias() {
4687        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4688        assert!(matches!(
4689            &cli.command,
4690            Some(Commands::Secret {
4691                action: SecretAction::List
4692            })
4693        ));
4694    }
4695
4696    #[test]
4697    fn test_format_secrets_empty() {
4698        let secrets: Vec<serde_json::Value> = vec![];
4699        assert_eq!(format_secrets(&secrets), "No secrets configured.");
4700    }
4701
4702    #[test]
4703    fn test_format_secrets_with_entries() {
4704        let secrets = vec![
4705            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4706            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4707        ];
4708        let output = format_secrets(&secrets);
4709        assert!(output.contains("GITHUB_TOKEN"));
4710        assert!(output.contains("NPM_TOKEN"));
4711        assert!(output.contains("NAME"));
4712        assert!(output.contains("ENV"));
4713        assert!(output.contains("CREATED"));
4714    }
4715
4716    #[test]
4717    fn test_format_secrets_with_env() {
4718        let secrets = vec![
4719            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4720            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4721        ];
4722        let output = format_secrets(&secrets);
4723        assert!(output.contains("GH_WORK"));
4724        assert!(output.contains("GITHUB_TOKEN"));
4725        assert!(output.contains("NPM_TOKEN"));
4726    }
4727
4728    #[test]
4729    fn test_format_secrets_short_timestamp() {
4730        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4731        let output = format_secrets(&secrets);
4732        assert!(output.contains("now"));
4733    }
4734
4735    #[test]
4736    fn test_format_schedules_short_last_run_at() {
4737        // Regression: last_run_at shorter than 16 chars must not panic
4738        let schedules = vec![serde_json::json!({
4739            "name": "test",
4740            "cron": "* * * * *",
4741            "enabled": true,
4742            "last_run_at": "short",
4743            "target_node": null
4744        })];
4745        let output = format_schedules(&schedules);
4746        assert!(output.contains("short"));
4747    }
4748
4749    #[test]
4750    fn test_format_sessions_multibyte_command_truncation() {
4751        use chrono::Utc;
4752        use pulpo_common::session::SessionStatus;
4753        use uuid::Uuid;
4754
4755        // Command with multi-byte chars exceeding 50 bytes; must not panic
4756        let sessions = vec![Session {
4757            id: Uuid::nil(),
4758            name: "test".into(),
4759            workdir: "/tmp".into(),
4760            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4761            description: None,
4762            status: SessionStatus::Active,
4763            exit_code: None,
4764            backend_session_id: None,
4765            output_snapshot: None,
4766            metadata: None,
4767            ink: None,
4768            intervention_code: None,
4769            intervention_reason: None,
4770            intervention_at: None,
4771            last_output_at: None,
4772            idle_since: None,
4773            idle_threshold_secs: None,
4774            worktree_path: None,
4775            worktree_branch: None,
4776            runtime: Runtime::Tmux,
4777            created_at: Utc::now(),
4778            updated_at: Utc::now(),
4779        }];
4780        let output = format_sessions(&sessions);
4781        assert!(output.contains("..."));
4782    }
4783}