Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5    AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6    PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version = env!("PULPO_VERSION"),
15    args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18    /// Target node (default: localhost)
19    #[arg(long, default_value = "localhost:7433")]
20    pub node: String,
21
22    /// Auth token (auto-discovered from local daemon if omitted)
23    #[arg(long)]
24    pub token: Option<String>,
25
26    #[command(subcommand)]
27    pub command: Option<Commands>,
28
29    /// Quick spawn: `pulpo <path>` spawns a session in that directory
30    #[arg(value_name = "PATH")]
31    pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37    /// Attach to a session's terminal
38    #[command(visible_alias = "a")]
39    Attach {
40        /// Session name or ID
41        name: String,
42    },
43
44    /// Send input to a session
45    #[command(visible_alias = "i", visible_alias = "send")]
46    Input {
47        /// Session name or ID
48        name: String,
49        /// Text to send (sends Enter if omitted)
50        text: Option<String>,
51    },
52
53    /// Spawn a new agent session
54    #[command(visible_alias = "s")]
55    Spawn {
56        /// Session name (auto-generated from workdir if omitted)
57        name: Option<String>,
58
59        /// Working directory (defaults to current directory)
60        #[arg(long)]
61        workdir: Option<String>,
62
63        /// Ink name (from config)
64        #[arg(long)]
65        ink: Option<String>,
66
67        /// Human-readable description of the task
68        #[arg(long)]
69        description: Option<String>,
70
71        /// Don't attach to the session after spawning
72        #[arg(short, long)]
73        detach: bool,
74
75        /// Idle threshold in seconds (0 = never idle)
76        #[arg(long)]
77        idle_threshold: Option<u32>,
78
79        /// Auto-select the least loaded node
80        #[arg(long)]
81        auto: bool,
82
83        /// Create an isolated git worktree for the session
84        #[arg(long)]
85        worktree: bool,
86
87        /// Runtime environment: tmux (default) or docker
88        #[arg(long)]
89        runtime: Option<String>,
90
91        /// Secrets to inject as environment variables (by name)
92        #[arg(long)]
93        secret: Vec<String>,
94
95        /// Command to run (everything after --)
96        #[arg(last = true)]
97        command: Vec<String>,
98    },
99
100    /// List sessions (live only by default)
101    #[command(visible_alias = "ls")]
102    List {
103        /// Show all sessions including stopped and lost
104        #[arg(short, long)]
105        all: bool,
106    },
107
108    /// Show session logs/output
109    #[command(visible_alias = "l")]
110    Logs {
111        /// Session name or ID
112        name: String,
113
114        /// Number of lines to fetch
115        #[arg(long, default_value = "100")]
116        lines: usize,
117
118        /// Follow output (like `tail -f`)
119        #[arg(short, long)]
120        follow: bool,
121    },
122
123    /// Stop one or more sessions
124    #[command(visible_alias = "k", alias = "kill")]
125    Stop {
126        /// Session names or IDs
127        #[arg(required = true)]
128        names: Vec<String>,
129
130        /// Also purge the session from history
131        #[arg(long, short = 'p')]
132        purge: bool,
133    },
134
135    /// Remove all stopped and lost sessions
136    Cleanup,
137
138    /// Resume a lost session
139    #[command(visible_alias = "r")]
140    Resume {
141        /// Session name or ID
142        name: String,
143    },
144
145    /// List all known nodes
146    #[command(visible_alias = "n")]
147    Nodes,
148
149    /// Show intervention history for a session
150    #[command(visible_alias = "iv")]
151    Interventions {
152        /// Session name or ID
153        name: String,
154    },
155
156    /// Open the web dashboard in your browser
157    Ui,
158
159    /// Manage scheduled agent runs
160    #[command(visible_alias = "sched")]
161    Schedule {
162        #[command(subcommand)]
163        action: ScheduleAction,
164    },
165
166    /// Manage secrets (environment variables injected into sessions)
167    #[command(visible_alias = "sec")]
168    Secret {
169        #[command(subcommand)]
170        action: SecretAction,
171    },
172}
173
174#[derive(Subcommand, Debug)]
175pub enum SecretAction {
176    /// Set a secret
177    Set {
178        /// Secret name (will be the env var name, uppercase + underscores)
179        name: String,
180        /// Secret value
181        value: String,
182        /// Environment variable name (defaults to secret name)
183        #[arg(long)]
184        env: Option<String>,
185    },
186    /// List secret names
187    #[command(visible_alias = "ls")]
188    List,
189    /// Delete a secret
190    #[command(visible_alias = "rm")]
191    Delete {
192        /// Secret name
193        name: String,
194    },
195}
196
197#[derive(Subcommand, Debug)]
198pub enum ScheduleAction {
199    /// Add a new schedule
200    #[command(alias = "install")]
201    Add {
202        /// Schedule name
203        name: String,
204        /// Cron expression (e.g. "0 3 * * *")
205        cron: String,
206        /// Working directory
207        #[arg(long)]
208        workdir: Option<String>,
209        /// Target node (omit = local, "auto" = least-loaded)
210        #[arg(long)]
211        node: Option<String>,
212        /// Ink preset
213        #[arg(long)]
214        ink: Option<String>,
215        /// Description
216        #[arg(long)]
217        description: Option<String>,
218        /// Command to run (everything after --)
219        #[arg(last = true)]
220        command: Vec<String>,
221    },
222    /// List all schedules
223    #[command(alias = "ls")]
224    List,
225    /// Remove a schedule
226    #[command(alias = "rm")]
227    Remove {
228        /// Schedule name or ID
229        name: String,
230    },
231    /// Pause a schedule
232    Pause {
233        /// Schedule name or ID
234        name: String,
235    },
236    /// Resume a paused schedule
237    Resume {
238        /// Schedule name or ID
239        name: String,
240    },
241}
242
243/// The marker emitted by the agent wrapper when the agent process exits.
244const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
245
246/// Resolve a path to an absolute path string.
247fn resolve_path(path: &str) -> String {
248    let p = std::path::Path::new(path);
249    if p.is_absolute() {
250        path.to_owned()
251    } else {
252        std::env::current_dir().map_or_else(
253            |_| path.to_owned(),
254            |cwd| cwd.join(p).to_string_lossy().into_owned(),
255        )
256    }
257}
258
259/// Derive a session name from a directory path (basename, kebab-cased).
260fn derive_session_name(path: &str) -> String {
261    let basename = std::path::Path::new(path)
262        .file_name()
263        .and_then(|n| n.to_str())
264        .unwrap_or("session");
265    // Convert to kebab-case: lowercase, replace non-alphanumeric with hyphens, collapse
266    let kebab: String = basename
267        .chars()
268        .map(|c| {
269            if c.is_ascii_alphanumeric() {
270                c.to_ascii_lowercase()
271            } else {
272                '-'
273            }
274        })
275        .collect();
276    // Collapse consecutive hyphens and trim leading/trailing hyphens
277    let mut result = String::new();
278    for c in kebab.chars() {
279        if c == '-' && result.ends_with('-') {
280            continue;
281        }
282        result.push(c);
283    }
284    let result = result.trim_matches('-').to_owned();
285    if result.is_empty() {
286        "session".to_owned()
287    } else {
288        result
289    }
290}
291
292/// Deduplicate a session name by appending `-2`, `-3`, etc. if the base name is active.
293async fn deduplicate_session_name(
294    client: &reqwest::Client,
295    base: &str,
296    name: &str,
297    token: Option<&str>,
298) -> String {
299    // Check if the name is already taken by fetching the session
300    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
301        .send()
302        .await;
303    match resp {
304        Ok(r) if r.status().is_success() => {
305            // Session exists — try suffixed names
306            for i in 2..=99 {
307                let candidate = format!("{name}-{i}");
308                let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
309                    .send()
310                    .await;
311                match resp {
312                    Ok(r) if r.status().is_success() => {}
313                    _ => return candidate,
314                }
315            }
316            format!("{name}-100")
317        }
318        _ => name.to_owned(),
319    }
320}
321
322/// Format the base URL from the node address.
323pub fn base_url(node: &str) -> String {
324    format!("http://{node}")
325}
326
327/// Response shape for the output endpoint.
328#[derive(serde::Deserialize)]
329struct OutputResponse {
330    output: String,
331}
332
333/// Format a list of sessions as a table.
334const fn session_runtime(session: &Session) -> &'static str {
335    match session.runtime {
336        Runtime::Tmux => "tmux",
337        Runtime::Docker => "docker",
338    }
339}
340
341fn format_sessions(sessions: &[Session]) -> String {
342    if sessions.is_empty() {
343        return "No sessions.".into();
344    }
345    let mut lines = vec![format!(
346        "{:<10} {:<24} {:<12} {:<8} {}",
347        "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
348    )];
349    for s in sessions {
350        let cmd_display = if s.command.len() > 50 {
351            let truncated: String = s.command.chars().take(47).collect();
352            format!("{truncated}...")
353        } else {
354            s.command.clone()
355        };
356        let short_id = &s.id.to_string()[..8];
357        let has_pr = s
358            .metadata
359            .as_ref()
360            .is_some_and(|m| m.contains_key("pr_url"));
361        let name_display = match (s.worktree_path.is_some(), has_pr) {
362            (true, true) => format!("{} [wt] [PR]", s.name),
363            (true, false) => format!("{} [wt]", s.name),
364            (false, true) => format!("{} [PR]", s.name),
365            (false, false) => s.name.clone(),
366        };
367        lines.push(format!(
368            "{:<10} {:<24} {:<12} {:<8} {}",
369            short_id,
370            name_display,
371            s.status,
372            session_runtime(s),
373            cmd_display
374        ));
375    }
376    lines.join("\n")
377}
378
379/// Format the peers response as a table.
380fn format_nodes(resp: &PeersResponse) -> String {
381    let mut lines = vec![format!(
382        "{:<20} {:<25} {:<10} {}",
383        "NAME", "ADDRESS", "STATUS", "SESSIONS"
384    )];
385    lines.push(format!(
386        "{:<20} {:<25} {:<10} {}",
387        resp.local.name, "(local)", "online", "-"
388    ));
389    for p in &resp.peers {
390        let sessions = p
391            .session_count
392            .map_or_else(|| "-".into(), |c| c.to_string());
393        lines.push(format!(
394            "{:<20} {:<25} {:<10} {}",
395            p.name, p.address, p.status, sessions
396        ));
397    }
398    lines.join("\n")
399}
400
401/// Format intervention events as a table.
402fn format_interventions(events: &[InterventionEventResponse]) -> String {
403    if events.is_empty() {
404        return "No intervention events.".into();
405    }
406    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
407    for e in events {
408        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
409    }
410    lines.join("\n")
411}
412
413/// Build the command to open a URL in the default browser.
414#[cfg_attr(coverage, allow(dead_code))]
415fn build_open_command(url: &str) -> std::process::Command {
416    #[cfg(target_os = "macos")]
417    {
418        let mut cmd = std::process::Command::new("open");
419        cmd.arg(url);
420        cmd
421    }
422    #[cfg(target_os = "linux")]
423    {
424        let mut cmd = std::process::Command::new("xdg-open");
425        cmd.arg(url);
426        cmd
427    }
428    #[cfg(target_os = "windows")]
429    {
430        let mut cmd = std::process::Command::new("cmd");
431        cmd.args(["/C", "start", url]);
432        cmd
433    }
434    #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
435    {
436        // Fallback: try xdg-open
437        let mut cmd = std::process::Command::new("xdg-open");
438        cmd.arg(url);
439        cmd
440    }
441}
442
443/// Open a URL in the default browser.
444#[cfg(not(coverage))]
445fn open_browser(url: &str) -> Result<()> {
446    build_open_command(url).status()?;
447    Ok(())
448}
449
450/// Stub for coverage builds — avoids opening a browser during tests.
451#[cfg(coverage)]
452fn open_browser(_url: &str) -> Result<()> {
453    Ok(())
454}
455
456/// Build the command to attach to a session's terminal.
457/// Detects Docker sessions by the `docker:` prefix in the backend session ID.
458#[cfg_attr(coverage, allow(dead_code))]
459fn build_attach_command(backend_session_id: &str) -> std::process::Command {
460    // Docker sessions: exec into the container
461    if let Some(container) = backend_session_id.strip_prefix("docker:") {
462        let mut cmd = std::process::Command::new("docker");
463        cmd.args(["exec", "-it", container, "/bin/sh"]);
464        return cmd;
465    }
466    // tmux sessions
467    #[cfg(not(target_os = "windows"))]
468    {
469        let mut cmd = std::process::Command::new("tmux");
470        cmd.args(["attach-session", "-t", backend_session_id]);
471        cmd
472    }
473    #[cfg(target_os = "windows")]
474    {
475        // tmux attach not available on Windows — inform the user
476        let mut cmd = std::process::Command::new("cmd");
477        cmd.args([
478            "/C",
479            "echo",
480            "Attach not available on Windows. Use the web UI or --runtime docker.",
481        ]);
482        cmd
483    }
484}
485
486/// Attach to a session's terminal.
487#[cfg(not(any(test, coverage, target_os = "windows")))]
488fn attach_session(backend_session_id: &str) -> Result<()> {
489    let status = build_attach_command(backend_session_id).status()?;
490    if !status.success() {
491        anyhow::bail!("attach failed with {status}");
492    }
493    Ok(())
494}
495
496/// Stub for Windows — tmux attach is not available.
497#[cfg(all(target_os = "windows", not(test), not(coverage)))]
498fn attach_session(_backend_session_id: &str) -> Result<()> {
499    eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
500    Ok(())
501}
502
503/// Stub for test and coverage builds — avoids spawning real terminals during tests.
504#[cfg(any(test, coverage))]
505#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
506fn attach_session(_backend_session_id: &str) -> Result<()> {
507    Ok(())
508}
509
510/// Extract a clean error message from an API JSON response (or fall back to raw text).
511fn api_error(text: &str) -> anyhow::Error {
512    serde_json::from_str::<serde_json::Value>(text)
513        .ok()
514        .and_then(|v| v["error"].as_str().map(String::from))
515        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
516}
517
518/// Return the response body text, or a clean error if the response was non-success.
519async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
520    if resp.status().is_success() {
521        Ok(resp.text().await?)
522    } else {
523        let text = resp.text().await?;
524        Err(api_error(&text))
525    }
526}
527
528/// Map a reqwest error to a user-friendly message.
529fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
530    if err.is_connect() {
531        anyhow::anyhow!(
532            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
533        )
534    } else {
535        anyhow::anyhow!("Network error connecting to {node}: {err}")
536    }
537}
538
539/// Check if the node address points to localhost.
540fn is_localhost(node: &str) -> bool {
541    let host = node.split(':').next().unwrap_or(node);
542    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
543}
544
545/// Try to auto-discover the auth token from a local daemon.
546async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
547    let resp = client
548        .get(format!("{base}/api/v1/auth/token"))
549        .send()
550        .await
551        .ok()?;
552    let body: AuthTokenResponse = resp.json().await.ok()?;
553    if body.token.is_empty() {
554        None
555    } else {
556        Some(body.token)
557    }
558}
559
560/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
561async fn resolve_token(
562    client: &reqwest::Client,
563    base: &str,
564    node: &str,
565    explicit: Option<&str>,
566) -> Option<String> {
567    if let Some(t) = explicit {
568        return Some(t.to_owned());
569    }
570    if is_localhost(node) {
571        return discover_token(client, base).await;
572    }
573    None
574}
575
576/// Check if a node string needs resolution (no port specified).
577fn node_needs_resolution(node: &str) -> bool {
578    !node.contains(':')
579}
580
581/// Resolve a node reference to a `host:port` address.
582///
583/// If `node` looks like `host:port` (contains `:`), return as-is with no peer token.
584/// Otherwise, query the local daemon's peer registry for a matching name. If a matching
585/// online peer is found, return its address and optionally its configured auth token
586/// (from the config endpoint). Falls back to appending `:7433` if the peer is not found.
587#[cfg(not(coverage))]
588async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
589    // Already has port — use as-is
590    if !node_needs_resolution(node) {
591        return (node.to_owned(), None);
592    }
593
594    // Try to resolve via local daemon's peer registry
595    let local_base = "http://localhost:7433";
596    let mut resolved_address: Option<String> = None;
597
598    if let Ok(resp) = client
599        .get(format!("{local_base}/api/v1/peers"))
600        .send()
601        .await
602        && let Ok(peers_resp) = resp.json::<PeersResponse>().await
603    {
604        for peer in &peers_resp.peers {
605            if peer.name == node {
606                resolved_address = Some(peer.address.clone());
607                break;
608            }
609        }
610    }
611
612    let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
613
614    // Try to get the peer's auth token from the config endpoint
615    let peer_token = if let Ok(resp) = client
616        .get(format!("{local_base}/api/v1/config"))
617        .send()
618        .await
619        && let Ok(config) = resp.json::<ConfigResponse>().await
620        && let Some(entry) = config.peers.get(node)
621    {
622        entry.token().map(String::from)
623    } else {
624        None
625    };
626
627    (address, peer_token)
628}
629
630/// Coverage stub — no real HTTP resolution during coverage builds.
631#[cfg(coverage)]
632async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
633    if node_needs_resolution(node) {
634        (format!("{node}:7433"), None)
635    } else {
636        (node.to_owned(), None)
637    }
638}
639
640/// Build an authenticated GET request.
641fn authed_get(
642    client: &reqwest::Client,
643    url: String,
644    token: Option<&str>,
645) -> reqwest::RequestBuilder {
646    let req = client.get(url);
647    if let Some(t) = token {
648        req.bearer_auth(t)
649    } else {
650        req
651    }
652}
653
654/// Build an authenticated POST request.
655fn authed_post(
656    client: &reqwest::Client,
657    url: String,
658    token: Option<&str>,
659) -> reqwest::RequestBuilder {
660    let req = client.post(url);
661    if let Some(t) = token {
662        req.bearer_auth(t)
663    } else {
664        req
665    }
666}
667
668/// Build an authenticated DELETE request.
669fn authed_delete(
670    client: &reqwest::Client,
671    url: String,
672    token: Option<&str>,
673) -> reqwest::RequestBuilder {
674    let req = client.delete(url);
675    if let Some(t) = token {
676        req.bearer_auth(t)
677    } else {
678        req
679    }
680}
681
682/// Build an authenticated PUT request.
683#[cfg(not(coverage))]
684fn authed_put(
685    client: &reqwest::Client,
686    url: String,
687    token: Option<&str>,
688) -> reqwest::RequestBuilder {
689    let req = client.put(url);
690    if let Some(t) = token {
691        req.bearer_auth(t)
692    } else {
693        req
694    }
695}
696
697/// Fetch session output from the API.
698async fn fetch_output(
699    client: &reqwest::Client,
700    base: &str,
701    name: &str,
702    lines: usize,
703    token: Option<&str>,
704) -> Result<String> {
705    let resp = authed_get(
706        client,
707        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
708        token,
709    )
710    .send()
711    .await?;
712    let text = ok_or_api_error(resp).await?;
713    let output: OutputResponse = serde_json::from_str(&text)?;
714    Ok(output.output)
715}
716
717/// Fetch session status from the API.
718async fn fetch_session_status(
719    client: &reqwest::Client,
720    base: &str,
721    name: &str,
722    token: Option<&str>,
723) -> Result<String> {
724    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
725        .send()
726        .await?;
727    let text = ok_or_api_error(resp).await?;
728    let session: Session = serde_json::from_str(&text)?;
729    Ok(session.status.to_string())
730}
731
732/// Wait for the session to leave "creating" state, then check if it died instantly.
733/// Uses the session ID (not name) to avoid matching old stopped sessions with the same name.
734/// Returns an error with a helpful message if the session is lost/stopped.
735async fn check_session_alive(
736    client: &reqwest::Client,
737    base: &str,
738    session_id: &str,
739    token: Option<&str>,
740) -> Result<()> {
741    // Poll up to 3 times at 500ms intervals — handles slow daemons and Docker pull delays
742    for _ in 0..3 {
743        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
744        // Fetch by ID to avoid name collisions with old sessions
745        let resp = authed_get(
746            client,
747            format!("{base}/api/v1/sessions/{session_id}"),
748            token,
749        )
750        .send()
751        .await;
752        if let Ok(resp) = resp
753            && let Ok(text) = ok_or_api_error(resp).await
754            && let Ok(session) = serde_json::from_str::<Session>(&text)
755        {
756            match session.status.to_string().as_str() {
757                "creating" => continue,
758                "lost" | "stopped" => {
759                    anyhow::bail!(
760                        "Session \"{}\" exited immediately — the command may have failed.\n  Check logs: pulpo logs {}",
761                        session.name,
762                        session.name
763                    );
764                }
765                _ => return Ok(()),
766            }
767        }
768        // fetch failed — don't block, proceed to attach
769        break;
770    }
771    Ok(())
772}
773
774/// Compute the new trailing lines that differ from the previous output.
775///
776/// The output endpoint returns the last N lines from the terminal pane. As new lines
777/// appear, old lines at the top scroll off. We find the overlap between the end
778/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
779/// trailing lines.
780fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
781    if prev.is_empty() {
782        return new;
783    }
784
785    let prev_lines: Vec<&str> = prev.lines().collect();
786    let new_lines: Vec<&str> = new.lines().collect();
787
788    if new_lines.is_empty() {
789        return "";
790    }
791
792    // prev is non-empty (early return above), so last() always succeeds
793    let last_prev = prev_lines[prev_lines.len() - 1];
794
795    // Find the last line of prev in new to determine the overlap boundary
796    for i in (0..new_lines.len()).rev() {
797        if new_lines[i] == last_prev {
798            // Verify contiguous overlap: check that lines before this match too
799            let overlap_len = prev_lines.len().min(i + 1);
800            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
801            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
802            if prev_tail == new_overlap {
803                if i + 1 < new_lines.len() {
804                    // Return the slice of `new` after the overlap
805                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
806                    return new.get(consumed.min(new.len())..).unwrap_or("");
807                }
808                return "";
809            }
810        }
811    }
812
813    // No overlap found — output changed completely, print it all
814    new
815}
816
817/// Follow logs by polling, printing only new output. Returns when the session ends.
818async fn follow_logs(
819    client: &reqwest::Client,
820    base: &str,
821    name: &str,
822    lines: usize,
823    token: Option<&str>,
824    writer: &mut (dyn std::io::Write + Send),
825) -> Result<()> {
826    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
827    write!(writer, "{prev_output}")?;
828
829    let mut unchanged_ticks: u32 = 0;
830
831    loop {
832        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
833
834        // Fetch latest output
835        let new_output = fetch_output(client, base, name, lines, token).await?;
836
837        let diff = diff_output(&prev_output, &new_output);
838        if diff.is_empty() {
839            unchanged_ticks += 1;
840        } else {
841            write!(writer, "{diff}")?;
842            unchanged_ticks = 0;
843        }
844
845        // Check for agent exit marker in output
846        if new_output.contains(AGENT_EXIT_MARKER) {
847            break;
848        }
849
850        prev_output = new_output;
851
852        // Only check session status when output has been unchanged for 3+ ticks
853        if unchanged_ticks >= 3 {
854            let status = fetch_session_status(client, base, name, token).await?;
855            let is_terminal = status == "ready" || status == "stopped" || status == "lost";
856            if is_terminal {
857                break;
858            }
859        }
860    }
861    Ok(())
862}
863
864// --- Schedule API ---
865
866/// Execute a schedule subcommand via the scheduler API.
867#[cfg(not(coverage))]
868async fn execute_schedule(
869    client: &reqwest::Client,
870    action: &ScheduleAction,
871    base: &str,
872    token: Option<&str>,
873) -> Result<String> {
874    match action {
875        ScheduleAction::Add {
876            name,
877            cron,
878            workdir,
879            node,
880            ink,
881            description,
882            command,
883        } => {
884            let cmd = if command.is_empty() {
885                None
886            } else {
887                Some(command.join(" "))
888            };
889            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
890                std::env::current_dir()
891                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
892            });
893            let mut body = serde_json::json!({
894                "name": name,
895                "cron": cron,
896                "workdir": resolved_workdir,
897            });
898            if let Some(c) = &cmd {
899                body["command"] = serde_json::json!(c);
900            }
901            if let Some(n) = node {
902                body["target_node"] = serde_json::json!(n);
903            }
904            if let Some(i) = ink {
905                body["ink"] = serde_json::json!(i);
906            }
907            if let Some(d) = description {
908                body["description"] = serde_json::json!(d);
909            }
910            let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
911                .json(&body)
912                .send()
913                .await?;
914            ok_or_api_error(resp).await?;
915            Ok(format!("Created schedule \"{name}\""))
916        }
917        ScheduleAction::List => {
918            let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
919                .send()
920                .await?;
921            let text = ok_or_api_error(resp).await?;
922            let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
923            Ok(format_schedules(&schedules))
924        }
925        ScheduleAction::Remove { name } => {
926            let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
927                .send()
928                .await?;
929            ok_or_api_error(resp).await?;
930            Ok(format!("Removed schedule \"{name}\""))
931        }
932        ScheduleAction::Pause { name } => {
933            let body = serde_json::json!({ "enabled": false });
934            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
935                .json(&body)
936                .send()
937                .await?;
938            ok_or_api_error(resp).await?;
939            Ok(format!("Paused schedule \"{name}\""))
940        }
941        ScheduleAction::Resume { name } => {
942            let body = serde_json::json!({ "enabled": true });
943            let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
944                .json(&body)
945                .send()
946                .await?;
947            ok_or_api_error(resp).await?;
948            Ok(format!("Resumed schedule \"{name}\""))
949        }
950    }
951}
952
953/// Coverage stub for schedule execution.
954#[cfg(coverage)]
955#[allow(clippy::unnecessary_wraps)]
956async fn execute_schedule(
957    _client: &reqwest::Client,
958    _action: &ScheduleAction,
959    _base: &str,
960    _token: Option<&str>,
961) -> Result<String> {
962    Ok(String::new())
963}
964
965// --- Secret API ---
966
967/// Format secret entries as a table.
968#[cfg_attr(coverage, allow(dead_code))]
969fn format_secrets(secrets: &[serde_json::Value]) -> String {
970    if secrets.is_empty() {
971        return "No secrets configured.".into();
972    }
973    let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
974    for s in secrets {
975        let name = s["name"].as_str().unwrap_or("?");
976        let env_display = s["env"]
977            .as_str()
978            .map_or_else(|| name.to_owned(), String::from);
979        let created = s["created_at"]
980            .as_str()
981            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
982        lines.push(format!("{name:<24} {env_display:<24} {created}"));
983    }
984    lines.join("\n")
985}
986
987/// Execute a secret subcommand via the secrets API.
988#[cfg(not(coverage))]
989async fn execute_secret(
990    client: &reqwest::Client,
991    action: &SecretAction,
992    base: &str,
993    token: Option<&str>,
994) -> Result<String> {
995    match action {
996        SecretAction::Set { name, value, env } => {
997            let mut body = serde_json::json!({ "value": value });
998            if let Some(e) = env {
999                body["env"] = serde_json::json!(e);
1000            }
1001            let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1002                .json(&body)
1003                .send()
1004                .await?;
1005            ok_or_api_error(resp).await?;
1006            Ok(format!("Secret \"{name}\" set."))
1007        }
1008        SecretAction::List => {
1009            let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1010                .send()
1011                .await?;
1012            let text = ok_or_api_error(resp).await?;
1013            let parsed: serde_json::Value = serde_json::from_str(&text)?;
1014            let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1015            Ok(format_secrets(secrets))
1016        }
1017        SecretAction::Delete { name } => {
1018            let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1019                .send()
1020                .await?;
1021            ok_or_api_error(resp).await?;
1022            Ok(format!("Secret \"{name}\" deleted."))
1023        }
1024    }
1025}
1026
1027/// Coverage stub for secret execution.
1028#[cfg(coverage)]
1029#[allow(clippy::unnecessary_wraps)]
1030async fn execute_secret(
1031    _client: &reqwest::Client,
1032    _action: &SecretAction,
1033    _base: &str,
1034    _token: Option<&str>,
1035) -> Result<String> {
1036    Ok(String::new())
1037}
1038
1039/// Format a list of schedules as a table.
1040#[cfg_attr(coverage, allow(dead_code))]
1041fn format_schedules(schedules: &[serde_json::Value]) -> String {
1042    if schedules.is_empty() {
1043        return "No schedules.".into();
1044    }
1045    let mut lines = vec![format!(
1046        "{:<20} {:<18} {:<8} {:<12} {}",
1047        "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1048    )];
1049    for s in schedules {
1050        let name = s["name"].as_str().unwrap_or("?");
1051        let cron = s["cron"].as_str().unwrap_or("?");
1052        let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1053            "yes"
1054        } else {
1055            "no"
1056        };
1057        let last_run = s["last_run_at"]
1058            .as_str()
1059            .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1060        let node = s["target_node"].as_str().unwrap_or("local");
1061        lines.push(format!(
1062            "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1063        ));
1064    }
1065    lines.join("\n")
1066}
1067
1068/// Select the best node from the peer registry based on load.
1069/// Returns the node address and name of the least loaded online peer.
1070/// Scoring: lower memory usage + fewer active sessions = better.
1071#[cfg(not(coverage))]
1072async fn select_best_node(
1073    client: &reqwest::Client,
1074    base: &str,
1075    token: Option<&str>,
1076) -> Result<(String, String)> {
1077    let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1078        .send()
1079        .await?;
1080    let text = ok_or_api_error(resp).await?;
1081    let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1082
1083    // Score: fewer active sessions is better, more memory is better
1084    let mut best: Option<(String, String, f64)> = None; // (address, name, score)
1085
1086    for peer in &peers_resp.peers {
1087        if peer.status != pulpo_common::peer::PeerStatus::Online {
1088            continue;
1089        }
1090        let sessions = peer.session_count.unwrap_or(0);
1091        let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1092        // Lower score = better (fewer sessions, more memory)
1093        #[allow(clippy::cast_precision_loss)]
1094        let score = sessions as f64 - (mem as f64 / 1024.0);
1095        if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1096            best = Some((peer.address.clone(), peer.name.clone(), score));
1097        }
1098    }
1099
1100    // Fall back to local if no online peers
1101    match best {
1102        Some((addr, name, _)) => Ok((addr, name)),
1103        None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1104    }
1105}
1106
1107/// Coverage stub — auto-select always falls back to local.
1108#[cfg(coverage)]
1109#[allow(clippy::unnecessary_wraps)]
1110async fn select_best_node(
1111    _client: &reqwest::Client,
1112    _base: &str,
1113    _token: Option<&str>,
1114) -> Result<(String, String)> {
1115    Ok(("localhost:7433".into(), "local".into()))
1116}
1117
1118/// Execute the given CLI command against the specified node.
1119#[allow(clippy::too_many_lines)]
1120pub async fn execute(cli: &Cli) -> Result<String> {
1121    let client = reqwest::Client::new();
1122    let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1123    let url = base_url(&resolved_node);
1124    let node = &resolved_node;
1125    let token = resolve_token(&client, &url, node, cli.token.as_deref())
1126        .await
1127        .or(peer_token);
1128
1129    // Handle `pulpo <path>` shortcut — spawn a session in the given directory
1130    if cli.command.is_none() && cli.path.is_none() {
1131        // No subcommand and no path: print help
1132        use clap::CommandFactory;
1133        let mut cmd = Cli::command();
1134        cmd.print_help()?;
1135        println!();
1136        return Ok(String::new());
1137    }
1138    if cli.command.is_none() {
1139        let path = cli.path.as_deref().unwrap_or(".");
1140        let resolved_workdir = resolve_path(path);
1141        let base_name = derive_session_name(&resolved_workdir);
1142        let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1143        let body = serde_json::json!({
1144            "name": name,
1145            "workdir": resolved_workdir,
1146        });
1147        let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1148            .json(&body)
1149            .send()
1150            .await
1151            .map_err(|e| friendly_error(&e, node))?;
1152        let text = ok_or_api_error(resp).await?;
1153        let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1154        let msg = format!(
1155            "Created session \"{}\" ({})",
1156            resp.session.name, resp.session.id
1157        );
1158        let backend_id = resp
1159            .session
1160            .backend_session_id
1161            .as_deref()
1162            .unwrap_or(&resp.session.name);
1163        eprintln!("{msg}");
1164        // Path shortcut spawns a shell (no command) — skip liveness check
1165        // since shell sessions are immediately detected as idle by the watchdog
1166        attach_session(backend_id)?;
1167        return Ok(format!("Detached from session \"{}\".", resp.session.name));
1168    }
1169
1170    match cli.command.as_ref().unwrap() {
1171        Commands::Attach { name } => {
1172            // Fetch session to get status and backend_session_id
1173            let resp = authed_get(
1174                &client,
1175                format!("{url}/api/v1/sessions/{name}"),
1176                token.as_deref(),
1177            )
1178            .send()
1179            .await
1180            .map_err(|e| friendly_error(&e, node))?;
1181            let text = ok_or_api_error(resp).await?;
1182            let session: Session = serde_json::from_str(&text)?;
1183            match session.status.to_string().as_str() {
1184                "lost" => {
1185                    anyhow::bail!(
1186                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
1187                    );
1188                }
1189                "stopped" => {
1190                    anyhow::bail!(
1191                        "Session \"{name}\" is {} — cannot attach to a stopped session.",
1192                        session.status
1193                    );
1194                }
1195                _ => {}
1196            }
1197            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1198            attach_session(&backend_id)?;
1199            Ok(format!("Detached from session {name}."))
1200        }
1201        Commands::Input { name, text } => {
1202            let input_text = text.as_deref().unwrap_or("\n");
1203            let body = serde_json::json!({ "text": input_text });
1204            let resp = authed_post(
1205                &client,
1206                format!("{url}/api/v1/sessions/{name}/input"),
1207                token.as_deref(),
1208            )
1209            .json(&body)
1210            .send()
1211            .await
1212            .map_err(|e| friendly_error(&e, node))?;
1213            ok_or_api_error(resp).await?;
1214            Ok(format!("Sent input to session {name}."))
1215        }
1216        Commands::List { all } => {
1217            let list_url = if *all {
1218                format!("{url}/api/v1/sessions")
1219            } else {
1220                format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1221            };
1222            let resp = authed_get(&client, list_url, token.as_deref())
1223                .send()
1224                .await
1225                .map_err(|e| friendly_error(&e, node))?;
1226            let text = ok_or_api_error(resp).await?;
1227            let sessions: Vec<Session> = serde_json::from_str(&text)?;
1228            Ok(format_sessions(&sessions))
1229        }
1230        Commands::Nodes => {
1231            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1232                .send()
1233                .await
1234                .map_err(|e| friendly_error(&e, node))?;
1235            let text = ok_or_api_error(resp).await?;
1236            let resp: PeersResponse = serde_json::from_str(&text)?;
1237            Ok(format_nodes(&resp))
1238        }
1239        Commands::Spawn {
1240            workdir,
1241            name,
1242            ink,
1243            description,
1244            detach,
1245            idle_threshold,
1246            auto,
1247            worktree,
1248            runtime,
1249            secret,
1250            command,
1251        } => {
1252            let cmd = if command.is_empty() {
1253                None
1254            } else {
1255                Some(command.join(" "))
1256            };
1257            // Resolve workdir: --workdir flag > current directory
1258            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1259                std::env::current_dir()
1260                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1261            });
1262            // Resolve name: explicit > derived from workdir (with dedup)
1263            let resolved_name = if let Some(n) = name {
1264                n.clone()
1265            } else {
1266                let base_name = derive_session_name(&resolved_workdir);
1267                deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1268            };
1269            let mut body = serde_json::json!({
1270                "name": resolved_name,
1271                "workdir": resolved_workdir,
1272            });
1273            if let Some(c) = &cmd {
1274                body["command"] = serde_json::json!(c);
1275            }
1276            if let Some(i) = ink {
1277                body["ink"] = serde_json::json!(i);
1278            }
1279            if let Some(d) = description {
1280                body["description"] = serde_json::json!(d);
1281            }
1282            if let Some(t) = idle_threshold {
1283                body["idle_threshold_secs"] = serde_json::json!(t);
1284            }
1285            if *worktree {
1286                body["worktree"] = serde_json::json!(true);
1287                eprintln!(
1288                    "Worktree: branch pulpo/{resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1289                );
1290            }
1291            if let Some(rt) = runtime {
1292                body["runtime"] = serde_json::json!(rt);
1293            }
1294            if !secret.is_empty() {
1295                body["secrets"] = serde_json::json!(secret);
1296            }
1297            let spawn_url = if *auto {
1298                let (auto_addr, auto_name) =
1299                    select_best_node(&client, &url, token.as_deref()).await?;
1300                eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1301                base_url(&auto_addr)
1302            } else {
1303                url.clone()
1304            };
1305            let resp = authed_post(
1306                &client,
1307                format!("{spawn_url}/api/v1/sessions"),
1308                token.as_deref(),
1309            )
1310            .json(&body)
1311            .send()
1312            .await
1313            .map_err(|e| friendly_error(&e, node))?;
1314            let text = ok_or_api_error(resp).await?;
1315            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1316            let msg = format!(
1317                "Created session \"{}\" ({})",
1318                resp.session.name, resp.session.id
1319            );
1320            if !detach {
1321                let backend_id = resp
1322                    .session
1323                    .backend_session_id
1324                    .as_deref()
1325                    .unwrap_or(&resp.session.name);
1326                eprintln!("{msg}");
1327                // Only check liveness for explicit commands — shell sessions (no command)
1328                // may be immediately marked idle/stopped by the watchdog, which is expected
1329                if cmd.is_some() {
1330                    let sid = resp.session.id.to_string();
1331                    check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1332                }
1333                attach_session(backend_id)?;
1334                return Ok(format!("Detached from session \"{}\".", resp.session.name));
1335            }
1336            Ok(msg)
1337        }
1338        Commands::Stop { names, purge } => {
1339            let mut results = Vec::new();
1340            for name in names {
1341                let query = if *purge { "?purge=true" } else { "" };
1342                let resp = authed_post(
1343                    &client,
1344                    format!("{url}/api/v1/sessions/{name}/stop{query}"),
1345                    token.as_deref(),
1346                )
1347                .send()
1348                .await
1349                .map_err(|e| friendly_error(&e, node))?;
1350                let action = if *purge {
1351                    "stopped and purged"
1352                } else {
1353                    "stopped"
1354                };
1355                match ok_or_api_error(resp).await {
1356                    Ok(_) => results.push(format!("Session {name} {action}.")),
1357                    Err(e) => results.push(format!("Error stopping {name}: {e}")),
1358                }
1359            }
1360            Ok(results.join("\n"))
1361        }
1362        Commands::Cleanup => {
1363            let resp = authed_post(
1364                &client,
1365                format!("{url}/api/v1/sessions/cleanup"),
1366                token.as_deref(),
1367            )
1368            .send()
1369            .await
1370            .map_err(|e| friendly_error(&e, node))?;
1371            let text = ok_or_api_error(resp).await?;
1372            let result: serde_json::Value = serde_json::from_str(&text)?;
1373            let count = result["deleted"].as_u64().unwrap_or(0);
1374            if count == 0 {
1375                Ok("No stopped or lost sessions to clean up.".into())
1376            } else {
1377                Ok(format!("Cleaned up {count} session(s)."))
1378            }
1379        }
1380        Commands::Logs {
1381            name,
1382            lines,
1383            follow,
1384        } => {
1385            if *follow {
1386                let mut stdout = std::io::stdout();
1387                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1388                    .await
1389                    .map_err(|e| {
1390                        // Unwrap reqwest errors to friendly messages
1391                        match e.downcast::<reqwest::Error>() {
1392                            Ok(re) => friendly_error(&re, node),
1393                            Err(other) => other,
1394                        }
1395                    })?;
1396                Ok(String::new())
1397            } else {
1398                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1399                    .await
1400                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1401                        Ok(re) => friendly_error(&re, node),
1402                        Err(other) => other,
1403                    })?;
1404                Ok(output)
1405            }
1406        }
1407        Commands::Interventions { name } => {
1408            let resp = authed_get(
1409                &client,
1410                format!("{url}/api/v1/sessions/{name}/interventions"),
1411                token.as_deref(),
1412            )
1413            .send()
1414            .await
1415            .map_err(|e| friendly_error(&e, node))?;
1416            let text = ok_or_api_error(resp).await?;
1417            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1418            Ok(format_interventions(&events))
1419        }
1420        Commands::Ui => {
1421            let dashboard = base_url(node);
1422            open_browser(&dashboard)?;
1423            Ok(format!("Opening {dashboard}"))
1424        }
1425        Commands::Resume { name } => {
1426            let resp = authed_post(
1427                &client,
1428                format!("{url}/api/v1/sessions/{name}/resume"),
1429                token.as_deref(),
1430            )
1431            .send()
1432            .await
1433            .map_err(|e| friendly_error(&e, node))?;
1434            let text = ok_or_api_error(resp).await?;
1435            let session: Session = serde_json::from_str(&text)?;
1436            let backend_id = session
1437                .backend_session_id
1438                .as_deref()
1439                .unwrap_or(&session.name);
1440            eprintln!("Resumed session \"{}\"", session.name);
1441            let sid = session.id.to_string();
1442            check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1443            attach_session(backend_id)?;
1444            Ok(format!("Detached from session \"{}\".", session.name))
1445        }
1446        Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1447            .await
1448            .map_err(|e| match e.downcast::<reqwest::Error>() {
1449                Ok(re) => friendly_error(&re, node),
1450                Err(other) => other,
1451            }),
1452        Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1453            .await
1454            .map_err(|e| match e.downcast::<reqwest::Error>() {
1455                Ok(re) => friendly_error(&re, node),
1456                Err(other) => other,
1457            }),
1458    }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use super::*;
1464
1465    #[test]
1466    fn test_base_url() {
1467        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1468        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1469    }
1470
1471    #[test]
1472    fn test_cli_parse_list() {
1473        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1474        assert_eq!(cli.node, "localhost:7433");
1475        assert!(matches!(cli.command, Some(Commands::List { .. })));
1476    }
1477
1478    #[test]
1479    fn test_cli_parse_nodes() {
1480        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1481        assert!(matches!(cli.command, Some(Commands::Nodes)));
1482    }
1483
1484    #[test]
1485    fn test_cli_parse_ui() {
1486        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1487        assert!(matches!(cli.command, Some(Commands::Ui)));
1488    }
1489
1490    #[test]
1491    fn test_cli_parse_ui_custom_node() {
1492        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1493        // With args_conflicts_with_subcommands, "ui" is parsed as path when --node is explicit
1494        assert_eq!(cli.node, "mac-mini:7433");
1495        assert_eq!(cli.path.as_deref(), Some("ui"));
1496    }
1497
1498    #[test]
1499    fn test_build_open_command() {
1500        let cmd = build_open_command("http://localhost:7433");
1501        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1502        assert_eq!(args, vec!["http://localhost:7433"]);
1503        #[cfg(target_os = "macos")]
1504        assert_eq!(cmd.get_program(), "open");
1505        #[cfg(target_os = "linux")]
1506        assert_eq!(cmd.get_program(), "xdg-open");
1507    }
1508
1509    #[test]
1510    fn test_cli_parse_spawn() {
1511        let cli = Cli::try_parse_from([
1512            "pulpo",
1513            "spawn",
1514            "my-task",
1515            "--workdir",
1516            "/tmp/repo",
1517            "--",
1518            "claude",
1519            "-p",
1520            "Fix the bug",
1521        ])
1522        .unwrap();
1523        assert!(matches!(
1524            &cli.command,
1525            Some(Commands::Spawn { name, workdir, command, .. })
1526                if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1527                && command == &["claude", "-p", "Fix the bug"]
1528        ));
1529    }
1530
1531    #[test]
1532    fn test_cli_parse_spawn_with_ink() {
1533        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1534        assert!(matches!(
1535            &cli.command,
1536            Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1537        ));
1538    }
1539
1540    #[test]
1541    fn test_cli_parse_spawn_with_description() {
1542        let cli =
1543            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1544                .unwrap();
1545        assert!(matches!(
1546            &cli.command,
1547            Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1548        ));
1549    }
1550
1551    #[test]
1552    fn test_cli_parse_spawn_name_positional() {
1553        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1554        assert!(matches!(
1555            &cli.command,
1556            Some(Commands::Spawn { name, command, .. })
1557                if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1558        ));
1559    }
1560
1561    #[test]
1562    fn test_cli_parse_spawn_no_command() {
1563        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1564        assert!(matches!(
1565            &cli.command,
1566            Some(Commands::Spawn { command, .. }) if command.is_empty()
1567        ));
1568    }
1569
1570    #[test]
1571    fn test_cli_parse_spawn_idle_threshold() {
1572        let cli =
1573            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1574        assert!(matches!(
1575            &cli.command,
1576            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1577        ));
1578    }
1579
1580    #[test]
1581    fn test_cli_parse_spawn_idle_threshold_60() {
1582        let cli =
1583            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1584        assert!(matches!(
1585            &cli.command,
1586            Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1587        ));
1588    }
1589
1590    #[test]
1591    fn test_cli_parse_spawn_secrets() {
1592        let cli = Cli::try_parse_from([
1593            "pulpo",
1594            "spawn",
1595            "my-task",
1596            "--secret",
1597            "GITHUB_TOKEN",
1598            "--secret",
1599            "NPM_TOKEN",
1600        ])
1601        .unwrap();
1602        assert!(matches!(
1603            &cli.command,
1604            Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1605        ));
1606    }
1607
1608    #[test]
1609    fn test_cli_parse_spawn_no_secrets() {
1610        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1611        assert!(matches!(
1612            &cli.command,
1613            Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1614        ));
1615    }
1616
1617    #[test]
1618    fn test_cli_parse_secret_set_with_env() {
1619        let cli = Cli::try_parse_from([
1620            "pulpo",
1621            "secret",
1622            "set",
1623            "GH_WORK",
1624            "token123",
1625            "--env",
1626            "GITHUB_TOKEN",
1627        ])
1628        .unwrap();
1629        assert!(matches!(
1630            &cli.command,
1631            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1632                if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1633        ));
1634    }
1635
1636    #[test]
1637    fn test_cli_parse_secret_set_without_env() {
1638        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1639        assert!(matches!(
1640            &cli.command,
1641            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1642                if name == "MY_KEY" && value == "val" && env.is_none()
1643        ));
1644    }
1645
1646    #[test]
1647    fn test_cli_parse_spawn_worktree() {
1648        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1649        assert!(matches!(
1650            &cli.command,
1651            Some(Commands::Spawn { worktree, .. }) if *worktree
1652        ));
1653    }
1654
1655    #[test]
1656    fn test_cli_parse_spawn_detach() {
1657        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1658        assert!(matches!(
1659            &cli.command,
1660            Some(Commands::Spawn { detach, .. }) if *detach
1661        ));
1662    }
1663
1664    #[test]
1665    fn test_cli_parse_spawn_detach_short() {
1666        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1667        assert!(matches!(
1668            &cli.command,
1669            Some(Commands::Spawn { detach, .. }) if *detach
1670        ));
1671    }
1672
1673    #[test]
1674    fn test_cli_parse_spawn_detach_default() {
1675        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1676        assert!(matches!(
1677            &cli.command,
1678            Some(Commands::Spawn { detach, .. }) if !detach
1679        ));
1680    }
1681
1682    #[test]
1683    fn test_cli_parse_logs() {
1684        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1685        assert!(matches!(
1686            &cli.command,
1687            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1688        ));
1689    }
1690
1691    #[test]
1692    fn test_cli_parse_logs_with_lines() {
1693        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1694        assert!(matches!(
1695            &cli.command,
1696            Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1697        ));
1698    }
1699
1700    #[test]
1701    fn test_cli_parse_logs_follow() {
1702        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1703        assert!(matches!(
1704            &cli.command,
1705            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1706        ));
1707    }
1708
1709    #[test]
1710    fn test_cli_parse_logs_follow_short() {
1711        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1712        assert!(matches!(
1713            &cli.command,
1714            Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1715        ));
1716    }
1717
1718    #[test]
1719    fn test_cli_parse_stop() {
1720        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
1721        assert!(matches!(
1722            &cli.command,
1723            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1724        ));
1725    }
1726
1727    #[test]
1728    fn test_cli_parse_stop_purge() {
1729        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
1730        assert!(matches!(
1731            &cli.command,
1732            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1733        ));
1734
1735        let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
1736        assert!(matches!(
1737            &cli.command,
1738            Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1739        ));
1740    }
1741
1742    #[test]
1743    fn test_cli_parse_kill_alias() {
1744        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1745        assert!(matches!(
1746            &cli.command,
1747            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1748        ));
1749    }
1750
1751    #[test]
1752    fn test_cli_parse_resume() {
1753        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1754        assert!(matches!(
1755            &cli.command,
1756            Some(Commands::Resume { name }) if name == "my-session"
1757        ));
1758    }
1759
1760    #[test]
1761    fn test_cli_parse_input() {
1762        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1763        assert!(matches!(
1764            &cli.command,
1765            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1766        ));
1767    }
1768
1769    #[test]
1770    fn test_cli_parse_input_no_text() {
1771        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1772        assert!(matches!(
1773            &cli.command,
1774            Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1775        ));
1776    }
1777
1778    #[test]
1779    fn test_cli_parse_input_alias() {
1780        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1781        assert!(matches!(
1782            &cli.command,
1783            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1784        ));
1785    }
1786
1787    #[test]
1788    fn test_cli_parse_custom_node() {
1789        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1790        assert_eq!(cli.node, "win-pc:8080");
1791        // With args_conflicts_with_subcommands, "list" is parsed as path when --node is explicit
1792        assert_eq!(cli.path.as_deref(), Some("list"));
1793    }
1794
1795    #[test]
1796    fn test_cli_version() {
1797        let result = Cli::try_parse_from(["pulpo", "--version"]);
1798        // clap exits with an error (kind DisplayVersion) when --version is used
1799        let err = result.unwrap_err();
1800        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1801    }
1802
1803    #[test]
1804    fn test_cli_parse_no_subcommand_succeeds() {
1805        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1806        assert!(cli.command.is_none());
1807        assert!(cli.path.is_none());
1808    }
1809
1810    #[test]
1811    fn test_cli_debug() {
1812        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1813        let debug = format!("{cli:?}");
1814        assert!(debug.contains("List"));
1815    }
1816
1817    #[test]
1818    fn test_commands_debug() {
1819        let cmd = Commands::List { all: false };
1820        assert_eq!(format!("{cmd:?}"), "List { all: false }");
1821    }
1822
1823    /// A valid Session JSON for test responses.
1824    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1825
1826    /// A valid `CreateSessionResponse` JSON wrapping the session.
1827    fn test_create_response_json() -> String {
1828        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1829    }
1830
1831    /// Start a lightweight test HTTP server and return its address.
1832    async fn start_test_server() -> String {
1833        use axum::http::StatusCode;
1834        use axum::{
1835            Json, Router,
1836            routing::{get, post},
1837        };
1838
1839        let create_json = test_create_response_json();
1840
1841        let app = Router::new()
1842            .route(
1843                "/api/v1/sessions",
1844                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1845                    (StatusCode::CREATED, create_json.clone())
1846                }),
1847            )
1848            .route(
1849                "/api/v1/sessions/{id}",
1850                get(|| async { TEST_SESSION_JSON.to_owned() }),
1851            )
1852            .route(
1853                "/api/v1/sessions/{id}/stop",
1854                post(|| async { StatusCode::NO_CONTENT }),
1855            )
1856            .route(
1857                "/api/v1/sessions/{id}/output",
1858                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1859            )
1860            .route(
1861                "/api/v1/peers",
1862                get(|| async {
1863                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1864                }),
1865            )
1866            .route(
1867                "/api/v1/sessions/{id}/resume",
1868                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1869            )
1870            .route(
1871                "/api/v1/sessions/{id}/interventions",
1872                get(|| async { "[]".to_owned() }),
1873            )
1874            .route(
1875                "/api/v1/sessions/{id}/input",
1876                post(|| async { StatusCode::NO_CONTENT }),
1877            )
1878            .route(
1879                "/api/v1/schedules",
1880                get(|| async { Json::<Vec<()>>(vec![]) })
1881                    .post(|| async { StatusCode::CREATED }),
1882            )
1883            .route(
1884                "/api/v1/schedules/{id}",
1885                axum::routing::put(|| async { StatusCode::OK })
1886                    .delete(|| async { StatusCode::NO_CONTENT }),
1887            );
1888
1889        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1890        let addr = listener.local_addr().unwrap();
1891        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1892        format!("127.0.0.1:{}", addr.port())
1893    }
1894
1895    #[tokio::test]
1896    async fn test_execute_list_success() {
1897        let node = start_test_server().await;
1898        let cli = Cli {
1899            node,
1900            token: None,
1901            command: Some(Commands::List { all: false }),
1902            path: None,
1903        };
1904        let result = execute(&cli).await.unwrap();
1905        assert_eq!(result, "No sessions.");
1906    }
1907
1908    #[tokio::test]
1909    async fn test_execute_nodes_success() {
1910        let node = start_test_server().await;
1911        let cli = Cli {
1912            node,
1913            token: None,
1914            command: Some(Commands::Nodes),
1915            path: None,
1916        };
1917        let result = execute(&cli).await.unwrap();
1918        assert!(result.contains("test"));
1919        assert!(result.contains("(local)"));
1920        assert!(result.contains("NAME"));
1921    }
1922
1923    #[tokio::test]
1924    async fn test_execute_spawn_success() {
1925        let node = start_test_server().await;
1926        let cli = Cli {
1927            node,
1928            token: None,
1929            command: Some(Commands::Spawn {
1930                name: Some("test".into()),
1931                workdir: Some("/tmp/repo".into()),
1932                ink: None,
1933                description: None,
1934                detach: true,
1935                idle_threshold: None,
1936                auto: false,
1937                worktree: false,
1938                runtime: None,
1939                secret: vec![],
1940                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1941            }),
1942            path: None,
1943        };
1944        let result = execute(&cli).await.unwrap();
1945        assert!(result.contains("Created session"));
1946        assert!(result.contains("repo"));
1947    }
1948
1949    #[tokio::test]
1950    async fn test_execute_spawn_with_all_flags() {
1951        let node = start_test_server().await;
1952        let cli = Cli {
1953            node,
1954            token: None,
1955            command: Some(Commands::Spawn {
1956                name: Some("test".into()),
1957                workdir: Some("/tmp/repo".into()),
1958                ink: Some("coder".into()),
1959                description: Some("Fix the bug".into()),
1960                detach: true,
1961                idle_threshold: None,
1962                auto: false,
1963                worktree: false,
1964                runtime: None,
1965                secret: vec![],
1966                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1967            }),
1968            path: None,
1969        };
1970        let result = execute(&cli).await.unwrap();
1971        assert!(result.contains("Created session"));
1972    }
1973
1974    #[tokio::test]
1975    async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1976        let node = start_test_server().await;
1977        let cli = Cli {
1978            node,
1979            token: None,
1980            command: Some(Commands::Spawn {
1981                name: Some("full-opts".into()),
1982                workdir: Some("/tmp/repo".into()),
1983                ink: Some("coder".into()),
1984                description: Some("Full options".into()),
1985                detach: true,
1986                idle_threshold: Some(120),
1987                auto: false,
1988                worktree: true,
1989                runtime: Some("docker".into()),
1990                secret: vec![],
1991                command: vec!["claude".into()],
1992            }),
1993            path: None,
1994        };
1995        let result = execute(&cli).await.unwrap();
1996        assert!(result.contains("Created session"));
1997    }
1998
1999    #[tokio::test]
2000    async fn test_execute_spawn_no_name_derives_from_workdir() {
2001        let node = start_test_server().await;
2002        let cli = Cli {
2003            node,
2004            token: None,
2005            command: Some(Commands::Spawn {
2006                name: None,
2007                workdir: Some("/tmp/my-project".into()),
2008                ink: None,
2009                description: None,
2010                detach: true,
2011                idle_threshold: None,
2012                auto: false,
2013                worktree: false,
2014                runtime: None,
2015                secret: vec![],
2016                command: vec!["echo".into(), "hello".into()],
2017            }),
2018            path: None,
2019        };
2020        let result = execute(&cli).await.unwrap();
2021        assert!(result.contains("Created session"));
2022    }
2023
2024    #[tokio::test]
2025    async fn test_execute_spawn_no_command() {
2026        let node = start_test_server().await;
2027        let cli = Cli {
2028            node,
2029            token: None,
2030            command: Some(Commands::Spawn {
2031                name: Some("test".into()),
2032                workdir: Some("/tmp/repo".into()),
2033                ink: None,
2034                description: None,
2035                detach: true,
2036                idle_threshold: None,
2037                auto: false,
2038                worktree: false,
2039                runtime: None,
2040                secret: vec![],
2041                command: vec![],
2042            }),
2043            path: None,
2044        };
2045        let result = execute(&cli).await.unwrap();
2046        assert!(result.contains("Created session"));
2047    }
2048
2049    #[tokio::test]
2050    async fn test_execute_spawn_with_name() {
2051        let node = start_test_server().await;
2052        let cli = Cli {
2053            node,
2054            token: None,
2055            command: Some(Commands::Spawn {
2056                name: Some("my-task".into()),
2057                workdir: Some("/tmp/repo".into()),
2058                ink: None,
2059                description: None,
2060                detach: true,
2061                idle_threshold: None,
2062                auto: false,
2063                worktree: false,
2064                runtime: None,
2065                secret: vec![],
2066                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2067            }),
2068            path: None,
2069        };
2070        let result = execute(&cli).await.unwrap();
2071        assert!(result.contains("Created session"));
2072    }
2073
2074    #[tokio::test]
2075    async fn test_execute_spawn_auto_attach() {
2076        let node = start_test_server().await;
2077        let cli = Cli {
2078            node,
2079            token: None,
2080            command: Some(Commands::Spawn {
2081                name: Some("test".into()),
2082                workdir: Some("/tmp/repo".into()),
2083                ink: None,
2084                description: None,
2085                detach: false,
2086                idle_threshold: None,
2087                auto: false,
2088                worktree: false,
2089                runtime: None,
2090                secret: vec![],
2091                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2092            }),
2093            path: None,
2094        };
2095        let result = execute(&cli).await.unwrap();
2096        // When not detached, spawn prints creation to stderr and returns detach message
2097        assert!(result.contains("Detached from session"));
2098    }
2099
2100    #[tokio::test]
2101    async fn test_execute_stop_success() {
2102        let node = start_test_server().await;
2103        let cli = Cli {
2104            node,
2105            token: None,
2106            command: Some(Commands::Stop {
2107                names: vec!["test-session".into()],
2108                purge: false,
2109            }),
2110            path: None,
2111        };
2112        let result = execute(&cli).await.unwrap();
2113        assert!(result.contains("stopped"));
2114        assert!(!result.contains("purged"));
2115    }
2116
2117    #[tokio::test]
2118    async fn test_execute_stop_with_purge() {
2119        let node = start_test_server().await;
2120        let cli = Cli {
2121            node,
2122            token: None,
2123            command: Some(Commands::Stop {
2124                names: vec!["test-session".into()],
2125                purge: true,
2126            }),
2127            path: None,
2128        };
2129        let result = execute(&cli).await.unwrap();
2130        assert!(result.contains("stopped and purged"));
2131    }
2132
2133    #[tokio::test]
2134    async fn test_execute_logs_success() {
2135        let node = start_test_server().await;
2136        let cli = Cli {
2137            node,
2138            token: None,
2139            command: Some(Commands::Logs {
2140                name: "test-session".into(),
2141                lines: 50,
2142                follow: false,
2143            }),
2144            path: None,
2145        };
2146        let result = execute(&cli).await.unwrap();
2147        assert!(result.contains("test output"));
2148    }
2149
2150    #[tokio::test]
2151    async fn test_execute_list_connection_refused() {
2152        let cli = Cli {
2153            node: "localhost:1".into(),
2154            token: None,
2155            command: Some(Commands::List { all: false }),
2156            path: None,
2157        };
2158        let result = execute(&cli).await;
2159        let err = result.unwrap_err().to_string();
2160        assert!(
2161            err.contains("Could not connect to pulpod"),
2162            "Expected friendly error, got: {err}"
2163        );
2164        assert!(err.contains("localhost:1"));
2165    }
2166
2167    #[tokio::test]
2168    async fn test_execute_nodes_connection_refused() {
2169        let cli = Cli {
2170            node: "localhost:1".into(),
2171            token: None,
2172            command: Some(Commands::Nodes),
2173            path: None,
2174        };
2175        let result = execute(&cli).await;
2176        let err = result.unwrap_err().to_string();
2177        assert!(err.contains("Could not connect to pulpod"));
2178    }
2179
2180    #[tokio::test]
2181    async fn test_execute_stop_error_response() {
2182        use axum::{Router, http::StatusCode, routing::post};
2183
2184        let app = Router::new().route(
2185            "/api/v1/sessions/{id}/stop",
2186            post(|| async {
2187                (
2188                    StatusCode::NOT_FOUND,
2189                    "{\"error\":\"session not found: test-session\"}",
2190                )
2191            }),
2192        );
2193        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2194        let addr = listener.local_addr().unwrap();
2195        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2196        let node = format!("127.0.0.1:{}", addr.port());
2197
2198        let cli = Cli {
2199            node,
2200            token: None,
2201            command: Some(Commands::Stop {
2202                names: vec!["test-session".into()],
2203                purge: false,
2204            }),
2205            path: None,
2206        };
2207        let result = execute(&cli).await.unwrap();
2208        assert!(result.contains("Error stopping test-session"), "{result}");
2209    }
2210
2211    #[tokio::test]
2212    async fn test_execute_logs_error_response() {
2213        use axum::{Router, http::StatusCode, routing::get};
2214
2215        let app = Router::new().route(
2216            "/api/v1/sessions/{id}/output",
2217            get(|| async {
2218                (
2219                    StatusCode::NOT_FOUND,
2220                    "{\"error\":\"session not found: ghost\"}",
2221                )
2222            }),
2223        );
2224        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2225        let addr = listener.local_addr().unwrap();
2226        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2227        let node = format!("127.0.0.1:{}", addr.port());
2228
2229        let cli = Cli {
2230            node,
2231            token: None,
2232            command: Some(Commands::Logs {
2233                name: "ghost".into(),
2234                lines: 50,
2235                follow: false,
2236            }),
2237            path: None,
2238        };
2239        let err = execute(&cli).await.unwrap_err();
2240        assert_eq!(err.to_string(), "session not found: ghost");
2241    }
2242
2243    #[tokio::test]
2244    async fn test_execute_resume_error_response() {
2245        use axum::{Router, http::StatusCode, routing::post};
2246
2247        let app = Router::new().route(
2248            "/api/v1/sessions/{id}/resume",
2249            post(|| async {
2250                (
2251                    StatusCode::BAD_REQUEST,
2252                    "{\"error\":\"session is not lost (status: active)\"}",
2253                )
2254            }),
2255        );
2256        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2257        let addr = listener.local_addr().unwrap();
2258        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2259        let node = format!("127.0.0.1:{}", addr.port());
2260
2261        let cli = Cli {
2262            node,
2263            token: None,
2264            command: Some(Commands::Resume {
2265                name: "test-session".into(),
2266            }),
2267            path: None,
2268        };
2269        let err = execute(&cli).await.unwrap_err();
2270        assert_eq!(err.to_string(), "session is not lost (status: active)");
2271    }
2272
2273    #[tokio::test]
2274    async fn test_execute_spawn_error_response() {
2275        use axum::{Router, http::StatusCode, routing::post};
2276
2277        let app = Router::new().route(
2278            "/api/v1/sessions",
2279            post(|| async {
2280                (
2281                    StatusCode::INTERNAL_SERVER_ERROR,
2282                    "{\"error\":\"failed to spawn session\"}",
2283                )
2284            }),
2285        );
2286        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287        let addr = listener.local_addr().unwrap();
2288        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2289        let node = format!("127.0.0.1:{}", addr.port());
2290
2291        let cli = Cli {
2292            node,
2293            token: None,
2294            command: Some(Commands::Spawn {
2295                name: Some("test".into()),
2296                workdir: Some("/tmp/repo".into()),
2297                ink: None,
2298                description: None,
2299                detach: true,
2300                idle_threshold: None,
2301                auto: false,
2302                worktree: false,
2303                runtime: None,
2304                secret: vec![],
2305                command: vec!["test".into()],
2306            }),
2307            path: None,
2308        };
2309        let err = execute(&cli).await.unwrap_err();
2310        assert_eq!(err.to_string(), "failed to spawn session");
2311    }
2312
2313    #[tokio::test]
2314    async fn test_execute_interventions_error_response() {
2315        use axum::{Router, http::StatusCode, routing::get};
2316
2317        let app = Router::new().route(
2318            "/api/v1/sessions/{id}/interventions",
2319            get(|| async {
2320                (
2321                    StatusCode::NOT_FOUND,
2322                    "{\"error\":\"session not found: ghost\"}",
2323                )
2324            }),
2325        );
2326        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2327        let addr = listener.local_addr().unwrap();
2328        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2329        let node = format!("127.0.0.1:{}", addr.port());
2330
2331        let cli = Cli {
2332            node,
2333            token: None,
2334            command: Some(Commands::Interventions {
2335                name: "ghost".into(),
2336            }),
2337            path: None,
2338        };
2339        let err = execute(&cli).await.unwrap_err();
2340        assert_eq!(err.to_string(), "session not found: ghost");
2341    }
2342
2343    #[tokio::test]
2344    async fn test_execute_resume_success() {
2345        let node = start_test_server().await;
2346        let cli = Cli {
2347            node,
2348            token: None,
2349            command: Some(Commands::Resume {
2350                name: "test-session".into(),
2351            }),
2352            path: None,
2353        };
2354        let result = execute(&cli).await.unwrap();
2355        assert!(result.contains("Detached from session"));
2356    }
2357
2358    #[tokio::test]
2359    async fn test_execute_input_success() {
2360        let node = start_test_server().await;
2361        let cli = Cli {
2362            node,
2363            token: None,
2364            command: Some(Commands::Input {
2365                name: "test-session".into(),
2366                text: Some("yes".into()),
2367            }),
2368            path: None,
2369        };
2370        let result = execute(&cli).await.unwrap();
2371        assert!(result.contains("Sent input to session test-session"));
2372    }
2373
2374    #[tokio::test]
2375    async fn test_execute_input_no_text() {
2376        let node = start_test_server().await;
2377        let cli = Cli {
2378            node,
2379            token: None,
2380            command: Some(Commands::Input {
2381                name: "test-session".into(),
2382                text: None,
2383            }),
2384            path: None,
2385        };
2386        let result = execute(&cli).await.unwrap();
2387        assert!(result.contains("Sent input to session test-session"));
2388    }
2389
2390    #[tokio::test]
2391    async fn test_execute_input_connection_refused() {
2392        let cli = Cli {
2393            node: "localhost:1".into(),
2394            token: None,
2395            command: Some(Commands::Input {
2396                name: "test".into(),
2397                text: Some("y".into()),
2398            }),
2399            path: None,
2400        };
2401        let result = execute(&cli).await;
2402        let err = result.unwrap_err().to_string();
2403        assert!(err.contains("Could not connect to pulpod"));
2404    }
2405
2406    #[tokio::test]
2407    async fn test_execute_input_error_response() {
2408        use axum::{Router, http::StatusCode, routing::post};
2409
2410        let app = Router::new().route(
2411            "/api/v1/sessions/{id}/input",
2412            post(|| async {
2413                (
2414                    StatusCode::NOT_FOUND,
2415                    "{\"error\":\"session not found: ghost\"}",
2416                )
2417            }),
2418        );
2419        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2420        let addr = listener.local_addr().unwrap();
2421        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2422        let node = format!("127.0.0.1:{}", addr.port());
2423
2424        let cli = Cli {
2425            node,
2426            token: None,
2427            command: Some(Commands::Input {
2428                name: "ghost".into(),
2429                text: Some("y".into()),
2430            }),
2431            path: None,
2432        };
2433        let err = execute(&cli).await.unwrap_err();
2434        assert_eq!(err.to_string(), "session not found: ghost");
2435    }
2436
2437    #[tokio::test]
2438    async fn test_execute_ui() {
2439        let cli = Cli {
2440            node: "localhost:7433".into(),
2441            token: None,
2442            command: Some(Commands::Ui),
2443            path: None,
2444        };
2445        let result = execute(&cli).await.unwrap();
2446        assert!(result.contains("Opening"));
2447        assert!(result.contains("http://localhost:7433"));
2448    }
2449
2450    #[tokio::test]
2451    async fn test_execute_ui_custom_node() {
2452        let cli = Cli {
2453            node: "mac-mini:7433".into(),
2454            token: None,
2455            command: Some(Commands::Ui),
2456            path: None,
2457        };
2458        let result = execute(&cli).await.unwrap();
2459        assert!(result.contains("http://mac-mini:7433"));
2460    }
2461
2462    #[test]
2463    fn test_format_sessions_empty() {
2464        assert_eq!(format_sessions(&[]), "No sessions.");
2465    }
2466
2467    #[test]
2468    fn test_format_sessions_with_data() {
2469        use chrono::Utc;
2470        use pulpo_common::session::SessionStatus;
2471        use uuid::Uuid;
2472
2473        let sessions = vec![Session {
2474            id: Uuid::nil(),
2475            name: "my-api".into(),
2476            workdir: "/tmp/repo".into(),
2477            command: "claude -p 'Fix the bug'".into(),
2478            description: Some("Fix the bug".into()),
2479            status: SessionStatus::Active,
2480            exit_code: None,
2481            backend_session_id: None,
2482            output_snapshot: None,
2483            metadata: None,
2484            ink: None,
2485            intervention_code: None,
2486            intervention_reason: None,
2487            intervention_at: None,
2488            last_output_at: None,
2489            idle_since: None,
2490            idle_threshold_secs: None,
2491            worktree_path: None,
2492            runtime: Runtime::Tmux,
2493            created_at: Utc::now(),
2494            updated_at: Utc::now(),
2495        }];
2496        let output = format_sessions(&sessions);
2497        assert!(output.contains("ID"));
2498        assert!(output.contains("NAME"));
2499        assert!(output.contains("RUNTIME"));
2500        assert!(output.contains("COMMAND"));
2501        assert!(output.contains("00000000"));
2502        assert!(output.contains("my-api"));
2503        assert!(output.contains("active"));
2504        assert!(output.contains("tmux"));
2505        assert!(output.contains("claude -p 'Fix the bug'"));
2506    }
2507
2508    #[test]
2509    fn test_format_sessions_docker_runtime() {
2510        use chrono::Utc;
2511        use pulpo_common::session::SessionStatus;
2512        use uuid::Uuid;
2513
2514        let sessions = vec![Session {
2515            id: Uuid::nil(),
2516            name: "sandbox-test".into(),
2517            workdir: "/tmp".into(),
2518            command: "claude".into(),
2519            description: None,
2520            status: SessionStatus::Active,
2521            exit_code: None,
2522            backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2523            output_snapshot: None,
2524            metadata: None,
2525            ink: None,
2526            intervention_code: None,
2527            intervention_reason: None,
2528            intervention_at: None,
2529            last_output_at: None,
2530            idle_since: None,
2531            idle_threshold_secs: None,
2532            worktree_path: None,
2533            runtime: Runtime::Docker,
2534            created_at: Utc::now(),
2535            updated_at: Utc::now(),
2536        }];
2537        let output = format_sessions(&sessions);
2538        assert!(output.contains("docker"));
2539    }
2540
2541    #[test]
2542    fn test_format_sessions_long_command_truncated() {
2543        use chrono::Utc;
2544        use pulpo_common::session::SessionStatus;
2545        use uuid::Uuid;
2546
2547        let sessions = vec![Session {
2548            id: Uuid::nil(),
2549            name: "test".into(),
2550            workdir: "/tmp".into(),
2551            command:
2552                "claude -p 'A very long command that exceeds fifty characters in total length here'"
2553                    .into(),
2554            description: None,
2555            status: SessionStatus::Ready,
2556            exit_code: None,
2557            backend_session_id: None,
2558            output_snapshot: None,
2559            metadata: None,
2560            ink: None,
2561            intervention_code: None,
2562            intervention_reason: None,
2563            intervention_at: None,
2564            last_output_at: None,
2565            idle_since: None,
2566            idle_threshold_secs: None,
2567            worktree_path: None,
2568            runtime: Runtime::Tmux,
2569            created_at: Utc::now(),
2570            updated_at: Utc::now(),
2571        }];
2572        let output = format_sessions(&sessions);
2573        assert!(output.contains("..."));
2574    }
2575
2576    #[test]
2577    fn test_format_sessions_worktree_indicator() {
2578        use chrono::Utc;
2579        use pulpo_common::session::SessionStatus;
2580        use uuid::Uuid;
2581
2582        let sessions = vec![Session {
2583            id: Uuid::nil(),
2584            name: "wt-task".into(),
2585            workdir: "/repo".into(),
2586            command: "claude".into(),
2587            description: None,
2588            status: SessionStatus::Active,
2589            exit_code: None,
2590            backend_session_id: None,
2591            output_snapshot: None,
2592            metadata: None,
2593            ink: None,
2594            intervention_code: None,
2595            intervention_reason: None,
2596            intervention_at: None,
2597            last_output_at: None,
2598            idle_since: None,
2599            idle_threshold_secs: None,
2600            worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
2601            runtime: Runtime::Tmux,
2602            created_at: Utc::now(),
2603            updated_at: Utc::now(),
2604        }];
2605        let output = format_sessions(&sessions);
2606        assert!(
2607            output.contains("[wt]"),
2608            "should show worktree indicator: {output}"
2609        );
2610        assert!(output.contains("wt-task [wt]"));
2611    }
2612
2613    #[test]
2614    fn test_format_sessions_pr_indicator() {
2615        use chrono::Utc;
2616        use pulpo_common::session::SessionStatus;
2617        use std::collections::HashMap;
2618        use uuid::Uuid;
2619
2620        let mut meta = HashMap::new();
2621        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2622        let sessions = vec![Session {
2623            id: Uuid::nil(),
2624            name: "pr-task".into(),
2625            workdir: "/tmp".into(),
2626            command: "claude".into(),
2627            description: None,
2628            status: SessionStatus::Active,
2629            exit_code: None,
2630            backend_session_id: None,
2631            output_snapshot: None,
2632            metadata: Some(meta),
2633            ink: None,
2634            intervention_code: None,
2635            intervention_reason: None,
2636            intervention_at: None,
2637            last_output_at: None,
2638            idle_since: None,
2639            idle_threshold_secs: None,
2640            worktree_path: None,
2641            runtime: Runtime::Tmux,
2642            created_at: Utc::now(),
2643            updated_at: Utc::now(),
2644        }];
2645        let output = format_sessions(&sessions);
2646        assert!(
2647            output.contains("[PR]"),
2648            "should show PR indicator: {output}"
2649        );
2650        assert!(output.contains("pr-task [PR]"));
2651    }
2652
2653    #[test]
2654    fn test_format_sessions_worktree_and_pr_indicator() {
2655        use chrono::Utc;
2656        use pulpo_common::session::SessionStatus;
2657        use std::collections::HashMap;
2658        use uuid::Uuid;
2659
2660        let mut meta = HashMap::new();
2661        meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2662        let sessions = vec![Session {
2663            id: Uuid::nil(),
2664            name: "both-task".into(),
2665            workdir: "/tmp".into(),
2666            command: "claude".into(),
2667            description: None,
2668            status: SessionStatus::Active,
2669            exit_code: None,
2670            backend_session_id: None,
2671            output_snapshot: None,
2672            metadata: Some(meta),
2673            ink: None,
2674            intervention_code: None,
2675            intervention_reason: None,
2676            intervention_at: None,
2677            last_output_at: None,
2678            idle_since: None,
2679            idle_threshold_secs: None,
2680            worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
2681            runtime: Runtime::Tmux,
2682            created_at: Utc::now(),
2683            updated_at: Utc::now(),
2684        }];
2685        let output = format_sessions(&sessions);
2686        assert!(
2687            output.contains("[wt] [PR]"),
2688            "should show both indicators: {output}"
2689        );
2690    }
2691
2692    #[test]
2693    fn test_format_sessions_no_pr_without_metadata() {
2694        use chrono::Utc;
2695        use pulpo_common::session::SessionStatus;
2696        use uuid::Uuid;
2697
2698        let sessions = vec![Session {
2699            id: Uuid::nil(),
2700            name: "no-pr".into(),
2701            workdir: "/tmp".into(),
2702            command: "claude".into(),
2703            description: None,
2704            status: SessionStatus::Active,
2705            exit_code: None,
2706            backend_session_id: None,
2707            output_snapshot: None,
2708            metadata: None,
2709            ink: None,
2710            intervention_code: None,
2711            intervention_reason: None,
2712            intervention_at: None,
2713            last_output_at: None,
2714            idle_since: None,
2715            idle_threshold_secs: None,
2716            worktree_path: None,
2717            runtime: Runtime::Tmux,
2718            created_at: Utc::now(),
2719            updated_at: Utc::now(),
2720        }];
2721        let output = format_sessions(&sessions);
2722        assert!(
2723            !output.contains("[PR]"),
2724            "should not show PR indicator: {output}"
2725        );
2726    }
2727
2728    #[test]
2729    fn test_format_nodes() {
2730        use pulpo_common::node::NodeInfo;
2731        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2732
2733        let resp = PeersResponse {
2734            local: NodeInfo {
2735                name: "mac-mini".into(),
2736                hostname: "h".into(),
2737                os: "macos".into(),
2738                arch: "arm64".into(),
2739                cpus: 8,
2740                memory_mb: 16384,
2741                gpu: None,
2742            },
2743            peers: vec![PeerInfo {
2744                name: "win-pc".into(),
2745                address: "win-pc:7433".into(),
2746                status: PeerStatus::Online,
2747                node_info: None,
2748                session_count: Some(3),
2749                source: PeerSource::Configured,
2750            }],
2751        };
2752        let output = format_nodes(&resp);
2753        assert!(output.contains("mac-mini"));
2754        assert!(output.contains("(local)"));
2755        assert!(output.contains("win-pc"));
2756        assert!(output.contains('3'));
2757    }
2758
2759    #[test]
2760    fn test_format_nodes_no_session_count() {
2761        use pulpo_common::node::NodeInfo;
2762        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2763
2764        let resp = PeersResponse {
2765            local: NodeInfo {
2766                name: "local".into(),
2767                hostname: "h".into(),
2768                os: "linux".into(),
2769                arch: "x86_64".into(),
2770                cpus: 4,
2771                memory_mb: 8192,
2772                gpu: None,
2773            },
2774            peers: vec![PeerInfo {
2775                name: "peer".into(),
2776                address: "peer:7433".into(),
2777                status: PeerStatus::Offline,
2778                node_info: None,
2779                session_count: None,
2780                source: PeerSource::Configured,
2781            }],
2782        };
2783        let output = format_nodes(&resp);
2784        assert!(output.contains("offline"));
2785        // No session count → shows "-"
2786        let lines: Vec<&str> = output.lines().collect();
2787        assert!(lines[2].contains('-'));
2788    }
2789
2790    #[tokio::test]
2791    async fn test_execute_resume_connection_refused() {
2792        let cli = Cli {
2793            node: "localhost:1".into(),
2794            token: None,
2795            command: Some(Commands::Resume {
2796                name: "test".into(),
2797            }),
2798            path: None,
2799        };
2800        let result = execute(&cli).await;
2801        let err = result.unwrap_err().to_string();
2802        assert!(err.contains("Could not connect to pulpod"));
2803    }
2804
2805    #[tokio::test]
2806    async fn test_execute_spawn_connection_refused() {
2807        let cli = Cli {
2808            node: "localhost:1".into(),
2809            token: None,
2810            command: Some(Commands::Spawn {
2811                name: Some("test".into()),
2812                workdir: Some("/tmp".into()),
2813                ink: None,
2814                description: None,
2815                detach: true,
2816                idle_threshold: None,
2817                auto: false,
2818                worktree: false,
2819                runtime: None,
2820                secret: vec![],
2821                command: vec!["test".into()],
2822            }),
2823            path: None,
2824        };
2825        let result = execute(&cli).await;
2826        let err = result.unwrap_err().to_string();
2827        assert!(err.contains("Could not connect to pulpod"));
2828    }
2829
2830    #[tokio::test]
2831    async fn test_execute_stop_connection_refused() {
2832        let cli = Cli {
2833            node: "localhost:1".into(),
2834            token: None,
2835            command: Some(Commands::Stop {
2836                names: vec!["test".into()],
2837                purge: false,
2838            }),
2839            path: None,
2840        };
2841        let result = execute(&cli).await;
2842        let err = result.unwrap_err().to_string();
2843        assert!(err.contains("Could not connect to pulpod"));
2844    }
2845
2846    #[tokio::test]
2847    async fn test_execute_logs_connection_refused() {
2848        let cli = Cli {
2849            node: "localhost:1".into(),
2850            token: None,
2851            command: Some(Commands::Logs {
2852                name: "test".into(),
2853                lines: 50,
2854                follow: false,
2855            }),
2856            path: None,
2857        };
2858        let result = execute(&cli).await;
2859        let err = result.unwrap_err().to_string();
2860        assert!(err.contains("Could not connect to pulpod"));
2861    }
2862
2863    #[tokio::test]
2864    async fn test_friendly_error_connect() {
2865        // Make a request to a closed port to get a connect error
2866        let err = reqwest::Client::new()
2867            .get("http://127.0.0.1:1")
2868            .send()
2869            .await
2870            .unwrap_err();
2871        let friendly = friendly_error(&err, "test-node:1");
2872        let msg = friendly.to_string();
2873        assert!(
2874            msg.contains("Could not connect"),
2875            "Expected connect message, got: {msg}"
2876        );
2877    }
2878
2879    #[tokio::test]
2880    async fn test_friendly_error_other() {
2881        // A request to an invalid URL creates a builder error, not a connect error
2882        let err = reqwest::Client::new()
2883            .get("http://[::invalid::url")
2884            .send()
2885            .await
2886            .unwrap_err();
2887        let friendly = friendly_error(&err, "bad-host");
2888        let msg = friendly.to_string();
2889        assert!(
2890            msg.contains("Network error"),
2891            "Expected network error message, got: {msg}"
2892        );
2893        assert!(msg.contains("bad-host"));
2894    }
2895
2896    // -- Auth helper tests --
2897
2898    #[test]
2899    fn test_is_localhost_variants() {
2900        assert!(is_localhost("localhost:7433"));
2901        assert!(is_localhost("127.0.0.1:7433"));
2902        assert!(is_localhost("[::1]:7433"));
2903        assert!(is_localhost("::1"));
2904        assert!(is_localhost("localhost"));
2905        assert!(!is_localhost("mac-mini:7433"));
2906        assert!(!is_localhost("192.168.1.100:7433"));
2907    }
2908
2909    #[test]
2910    fn test_authed_get_with_token() {
2911        let client = reqwest::Client::new();
2912        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2913            .build()
2914            .unwrap();
2915        let auth = req
2916            .headers()
2917            .get("authorization")
2918            .unwrap()
2919            .to_str()
2920            .unwrap();
2921        assert_eq!(auth, "Bearer tok");
2922    }
2923
2924    #[test]
2925    fn test_authed_get_without_token() {
2926        let client = reqwest::Client::new();
2927        let req = authed_get(&client, "http://h:1/api".into(), None)
2928            .build()
2929            .unwrap();
2930        assert!(req.headers().get("authorization").is_none());
2931    }
2932
2933    #[test]
2934    fn test_authed_post_with_token() {
2935        let client = reqwest::Client::new();
2936        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2937            .build()
2938            .unwrap();
2939        let auth = req
2940            .headers()
2941            .get("authorization")
2942            .unwrap()
2943            .to_str()
2944            .unwrap();
2945        assert_eq!(auth, "Bearer secret");
2946    }
2947
2948    #[test]
2949    fn test_authed_post_without_token() {
2950        let client = reqwest::Client::new();
2951        let req = authed_post(&client, "http://h:1/api".into(), None)
2952            .build()
2953            .unwrap();
2954        assert!(req.headers().get("authorization").is_none());
2955    }
2956
2957    #[test]
2958    fn test_authed_delete_with_token() {
2959        let client = reqwest::Client::new();
2960        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2961            .build()
2962            .unwrap();
2963        let auth = req
2964            .headers()
2965            .get("authorization")
2966            .unwrap()
2967            .to_str()
2968            .unwrap();
2969        assert_eq!(auth, "Bearer del-tok");
2970    }
2971
2972    #[test]
2973    fn test_authed_delete_without_token() {
2974        let client = reqwest::Client::new();
2975        let req = authed_delete(&client, "http://h:1/api".into(), None)
2976            .build()
2977            .unwrap();
2978        assert!(req.headers().get("authorization").is_none());
2979    }
2980
2981    #[tokio::test]
2982    async fn test_resolve_token_explicit() {
2983        let client = reqwest::Client::new();
2984        let token =
2985            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2986        assert_eq!(token, Some("my-tok".into()));
2987    }
2988
2989    #[tokio::test]
2990    async fn test_resolve_token_remote_no_explicit() {
2991        let client = reqwest::Client::new();
2992        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2993        assert_eq!(token, None);
2994    }
2995
2996    #[tokio::test]
2997    async fn test_resolve_token_localhost_auto_discover() {
2998        use axum::{Json, Router, routing::get};
2999
3000        let app = Router::new().route(
3001            "/api/v1/auth/token",
3002            get(|| async {
3003                Json(AuthTokenResponse {
3004                    token: "discovered".into(),
3005                })
3006            }),
3007        );
3008        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3009        let addr = listener.local_addr().unwrap();
3010        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3011
3012        let node = format!("localhost:{}", addr.port());
3013        let base = base_url(&node);
3014        let client = reqwest::Client::new();
3015        let token = resolve_token(&client, &base, &node, None).await;
3016        assert_eq!(token, Some("discovered".into()));
3017    }
3018
3019    #[tokio::test]
3020    async fn test_discover_token_empty_returns_none() {
3021        use axum::{Json, Router, routing::get};
3022
3023        let app = Router::new().route(
3024            "/api/v1/auth/token",
3025            get(|| async {
3026                Json(AuthTokenResponse {
3027                    token: String::new(),
3028                })
3029            }),
3030        );
3031        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3032        let addr = listener.local_addr().unwrap();
3033        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3034
3035        let base = format!("http://127.0.0.1:{}", addr.port());
3036        let client = reqwest::Client::new();
3037        assert_eq!(discover_token(&client, &base).await, None);
3038    }
3039
3040    #[tokio::test]
3041    async fn test_discover_token_unreachable_returns_none() {
3042        let client = reqwest::Client::new();
3043        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3044    }
3045
3046    #[test]
3047    fn test_cli_parse_with_token() {
3048        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3049        assert_eq!(cli.token, Some("my-secret".into()));
3050    }
3051
3052    #[test]
3053    fn test_cli_parse_without_token() {
3054        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3055        assert_eq!(cli.token, None);
3056    }
3057
3058    #[tokio::test]
3059    async fn test_execute_with_explicit_token_sends_header() {
3060        use axum::{Router, extract::Request, http::StatusCode, routing::get};
3061
3062        let app = Router::new().route(
3063            "/api/v1/sessions",
3064            get(|req: Request| async move {
3065                let auth = req
3066                    .headers()
3067                    .get("authorization")
3068                    .and_then(|v| v.to_str().ok())
3069                    .unwrap_or("");
3070                assert_eq!(auth, "Bearer test-token");
3071                (StatusCode::OK, "[]".to_owned())
3072            }),
3073        );
3074        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3075        let addr = listener.local_addr().unwrap();
3076        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3077        let node = format!("127.0.0.1:{}", addr.port());
3078
3079        let cli = Cli {
3080            node,
3081            token: Some("test-token".into()),
3082            command: Some(Commands::List { all: false }),
3083            path: None,
3084        };
3085        let result = execute(&cli).await.unwrap();
3086        assert_eq!(result, "No sessions.");
3087    }
3088
3089    // -- Interventions tests --
3090
3091    #[test]
3092    fn test_cli_parse_interventions() {
3093        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3094        assert!(matches!(
3095            &cli.command,
3096            Some(Commands::Interventions { name }) if name == "my-session"
3097        ));
3098    }
3099
3100    #[test]
3101    fn test_format_interventions_empty() {
3102        assert_eq!(format_interventions(&[]), "No intervention events.");
3103    }
3104
3105    #[test]
3106    fn test_format_interventions_with_data() {
3107        let events = vec![
3108            InterventionEventResponse {
3109                id: 1,
3110                session_id: "sess-1".into(),
3111                code: None,
3112                reason: "Memory exceeded threshold".into(),
3113                created_at: "2026-01-01T00:00:00Z".into(),
3114            },
3115            InterventionEventResponse {
3116                id: 2,
3117                session_id: "sess-1".into(),
3118                code: None,
3119                reason: "Idle for 10 minutes".into(),
3120                created_at: "2026-01-02T00:00:00Z".into(),
3121            },
3122        ];
3123        let output = format_interventions(&events);
3124        assert!(output.contains("ID"));
3125        assert!(output.contains("TIMESTAMP"));
3126        assert!(output.contains("REASON"));
3127        assert!(output.contains("Memory exceeded threshold"));
3128        assert!(output.contains("Idle for 10 minutes"));
3129        assert!(output.contains("2026-01-01T00:00:00Z"));
3130    }
3131
3132    #[tokio::test]
3133    async fn test_execute_interventions_empty() {
3134        let node = start_test_server().await;
3135        let cli = Cli {
3136            node,
3137            token: None,
3138            command: Some(Commands::Interventions {
3139                name: "my-session".into(),
3140            }),
3141            path: None,
3142        };
3143        let result = execute(&cli).await.unwrap();
3144        assert_eq!(result, "No intervention events.");
3145    }
3146
3147    #[tokio::test]
3148    async fn test_execute_interventions_with_data() {
3149        use axum::{Router, routing::get};
3150
3151        let app = Router::new().route(
3152            "/api/v1/sessions/{id}/interventions",
3153            get(|| async {
3154                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3155                    .to_owned()
3156            }),
3157        );
3158        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3159        let addr = listener.local_addr().unwrap();
3160        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3161        let node = format!("127.0.0.1:{}", addr.port());
3162
3163        let cli = Cli {
3164            node,
3165            token: None,
3166            command: Some(Commands::Interventions {
3167                name: "test".into(),
3168            }),
3169            path: None,
3170        };
3171        let result = execute(&cli).await.unwrap();
3172        assert!(result.contains("OOM"));
3173        assert!(result.contains("2026-01-01T00:00:00Z"));
3174    }
3175
3176    #[tokio::test]
3177    async fn test_execute_interventions_connection_refused() {
3178        let cli = Cli {
3179            node: "localhost:1".into(),
3180            token: None,
3181            command: Some(Commands::Interventions {
3182                name: "test".into(),
3183            }),
3184            path: None,
3185        };
3186        let result = execute(&cli).await;
3187        let err = result.unwrap_err().to_string();
3188        assert!(err.contains("Could not connect to pulpod"));
3189    }
3190
3191    // -- Attach command tests --
3192
3193    #[test]
3194    fn test_build_attach_command_tmux() {
3195        let cmd = build_attach_command("my-session");
3196        assert_eq!(cmd.get_program(), "tmux");
3197        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3198        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3199    }
3200
3201    #[test]
3202    fn test_build_attach_command_docker() {
3203        let cmd = build_attach_command("docker:pulpo-my-task");
3204        assert_eq!(cmd.get_program(), "docker");
3205        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3206        assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3207    }
3208
3209    #[test]
3210    fn test_cli_parse_attach() {
3211        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3212        assert!(matches!(
3213            &cli.command,
3214            Some(Commands::Attach { name }) if name == "my-session"
3215        ));
3216    }
3217
3218    #[test]
3219    fn test_cli_parse_attach_alias() {
3220        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3221        assert!(matches!(
3222            &cli.command,
3223            Some(Commands::Attach { name }) if name == "my-session"
3224        ));
3225    }
3226
3227    #[tokio::test]
3228    async fn test_execute_attach_success() {
3229        let node = start_test_server().await;
3230        let cli = Cli {
3231            node,
3232            token: None,
3233            command: Some(Commands::Attach {
3234                name: "test-session".into(),
3235            }),
3236            path: None,
3237        };
3238        let result = execute(&cli).await.unwrap();
3239        assert!(result.contains("Detached from session test-session"));
3240    }
3241
3242    #[tokio::test]
3243    async fn test_execute_attach_with_backend_session_id() {
3244        use axum::{Router, routing::get};
3245        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3246        let app = Router::new().route(
3247            "/api/v1/sessions/{id}",
3248            get(move || async move { session_json.to_owned() }),
3249        );
3250        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3251        let addr = listener.local_addr().unwrap();
3252        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3253
3254        let cli = Cli {
3255            node: format!("127.0.0.1:{}", addr.port()),
3256            token: None,
3257            command: Some(Commands::Attach {
3258                name: "my-session".into(),
3259            }),
3260            path: None,
3261        };
3262        let result = execute(&cli).await.unwrap();
3263        assert!(result.contains("Detached from session my-session"));
3264    }
3265
3266    #[tokio::test]
3267    async fn test_execute_attach_connection_refused() {
3268        let cli = Cli {
3269            node: "localhost:1".into(),
3270            token: None,
3271            command: Some(Commands::Attach {
3272                name: "test-session".into(),
3273            }),
3274            path: None,
3275        };
3276        let result = execute(&cli).await;
3277        let err = result.unwrap_err().to_string();
3278        assert!(err.contains("Could not connect to pulpod"));
3279    }
3280
3281    #[tokio::test]
3282    async fn test_execute_attach_error_response() {
3283        use axum::{Router, http::StatusCode, routing::get};
3284        let app = Router::new().route(
3285            "/api/v1/sessions/{id}",
3286            get(|| async {
3287                (
3288                    StatusCode::NOT_FOUND,
3289                    r#"{"error":"session not found"}"#.to_owned(),
3290                )
3291            }),
3292        );
3293        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3294        let addr = listener.local_addr().unwrap();
3295        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3296
3297        let cli = Cli {
3298            node: format!("127.0.0.1:{}", addr.port()),
3299            token: None,
3300            command: Some(Commands::Attach {
3301                name: "nonexistent".into(),
3302            }),
3303            path: None,
3304        };
3305        let result = execute(&cli).await;
3306        let err = result.unwrap_err().to_string();
3307        assert!(err.contains("session not found"));
3308    }
3309
3310    #[tokio::test]
3311    async fn test_execute_attach_stale_session() {
3312        use axum::{Router, routing::get};
3313        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3314        let app = Router::new().route(
3315            "/api/v1/sessions/{id}",
3316            get(move || async move { session_json.to_owned() }),
3317        );
3318        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3319        let addr = listener.local_addr().unwrap();
3320        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3321
3322        let cli = Cli {
3323            node: format!("127.0.0.1:{}", addr.port()),
3324            token: None,
3325            command: Some(Commands::Attach {
3326                name: "stale-sess".into(),
3327            }),
3328            path: None,
3329        };
3330        let result = execute(&cli).await;
3331        let err = result.unwrap_err().to_string();
3332        assert!(err.contains("lost"));
3333        assert!(err.contains("pulpo resume"));
3334    }
3335
3336    #[tokio::test]
3337    async fn test_execute_attach_dead_session() {
3338        use axum::{Router, routing::get};
3339        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3340        let app = Router::new().route(
3341            "/api/v1/sessions/{id}",
3342            get(move || async move { session_json.to_owned() }),
3343        );
3344        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3345        let addr = listener.local_addr().unwrap();
3346        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3347
3348        let cli = Cli {
3349            node: format!("127.0.0.1:{}", addr.port()),
3350            token: None,
3351            command: Some(Commands::Attach {
3352                name: "dead-sess".into(),
3353            }),
3354            path: None,
3355        };
3356        let result = execute(&cli).await;
3357        let err = result.unwrap_err().to_string();
3358        assert!(err.contains("stopped"));
3359        assert!(err.contains("cannot attach"));
3360    }
3361
3362    // -- Alias parse tests --
3363
3364    #[test]
3365    fn test_cli_parse_alias_spawn() {
3366        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3367        assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3368    }
3369
3370    #[test]
3371    fn test_cli_parse_alias_list() {
3372        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3373        assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3374    }
3375
3376    #[test]
3377    fn test_cli_parse_list_all() {
3378        let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3379        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3380
3381        let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3382        assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3383    }
3384
3385    #[test]
3386    fn test_cli_parse_alias_logs() {
3387        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3388        assert!(matches!(
3389            &cli.command,
3390            Some(Commands::Logs { name, .. }) if name == "my-session"
3391        ));
3392    }
3393
3394    #[test]
3395    fn test_cli_parse_alias_stop() {
3396        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3397        assert!(matches!(
3398            &cli.command,
3399            Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
3400        ));
3401    }
3402
3403    #[test]
3404    fn test_cli_parse_alias_resume() {
3405        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3406        assert!(matches!(
3407            &cli.command,
3408            Some(Commands::Resume { name }) if name == "my-session"
3409        ));
3410    }
3411
3412    #[test]
3413    fn test_cli_parse_alias_nodes() {
3414        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3415        assert!(matches!(&cli.command, Some(Commands::Nodes)));
3416    }
3417
3418    #[test]
3419    fn test_cli_parse_alias_interventions() {
3420        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3421        assert!(matches!(
3422            &cli.command,
3423            Some(Commands::Interventions { name }) if name == "my-session"
3424        ));
3425    }
3426
3427    #[test]
3428    fn test_api_error_json() {
3429        let err = api_error("{\"error\":\"session not found: foo\"}");
3430        assert_eq!(err.to_string(), "session not found: foo");
3431    }
3432
3433    #[test]
3434    fn test_api_error_plain_text() {
3435        let err = api_error("plain text error");
3436        assert_eq!(err.to_string(), "plain text error");
3437    }
3438
3439    // -- diff_output tests --
3440
3441    #[test]
3442    fn test_diff_output_empty_prev() {
3443        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3444    }
3445
3446    #[test]
3447    fn test_diff_output_identical() {
3448        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3449    }
3450
3451    #[test]
3452    fn test_diff_output_new_lines_appended() {
3453        let prev = "line1\nline2";
3454        let new = "line1\nline2\nline3\nline4";
3455        assert_eq!(diff_output(prev, new), "line3\nline4");
3456    }
3457
3458    #[test]
3459    fn test_diff_output_scrolled_window() {
3460        // Window of 3 lines: old lines scroll off top, new appear at bottom
3461        let prev = "line1\nline2\nline3";
3462        let new = "line2\nline3\nline4";
3463        assert_eq!(diff_output(prev, new), "line4");
3464    }
3465
3466    #[test]
3467    fn test_diff_output_completely_different() {
3468        let prev = "aaa\nbbb";
3469        let new = "xxx\nyyy";
3470        assert_eq!(diff_output(prev, new), "xxx\nyyy");
3471    }
3472
3473    #[test]
3474    fn test_diff_output_last_line_matches_but_overlap_fails() {
3475        // Last line of prev appears in new but preceding lines don't match
3476        let prev = "aaa\ncommon";
3477        let new = "zzz\ncommon\nnew_line";
3478        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
3479        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
3480        // Falls through, no verified overlap, so returns everything
3481        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3482    }
3483
3484    #[test]
3485    fn test_diff_output_new_empty() {
3486        assert_eq!(diff_output("line1", ""), "");
3487    }
3488
3489    // -- follow_logs tests --
3490
3491    /// Start a test server that simulates evolving output and session status transitions.
3492    /// Start a test server that simulates evolving output with agent exit marker.
3493    async fn start_follow_test_server() -> String {
3494        use axum::{Router, extract::Path, extract::Query, routing::get};
3495        use std::sync::Arc;
3496        use std::sync::atomic::{AtomicUsize, Ordering};
3497
3498        let call_count = Arc::new(AtomicUsize::new(0));
3499        let output_count = call_count.clone();
3500
3501        let app = Router::new()
3502            .route(
3503                "/api/v1/sessions/{id}/output",
3504                get(
3505                    move |_path: Path<String>,
3506                          _query: Query<std::collections::HashMap<String, String>>| {
3507                        let count = output_count.clone();
3508                        async move {
3509                            let n = count.fetch_add(1, Ordering::SeqCst);
3510                            let output = match n {
3511                                0 => "line1\nline2".to_owned(),
3512                                1 => "line1\nline2\nline3".to_owned(),
3513                                _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3514                            };
3515                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3516                        }
3517                    },
3518                ),
3519            )
3520            .route(
3521                "/api/v1/sessions/{id}",
3522                get(|_path: Path<String>| async {
3523                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3524                }),
3525            );
3526
3527        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3528        let addr = listener.local_addr().unwrap();
3529        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3530        format!("http://127.0.0.1:{}", addr.port())
3531    }
3532
3533    #[tokio::test]
3534    async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3535        let base = start_follow_test_server().await;
3536        let client = reqwest::Client::new();
3537        let mut buf = Vec::new();
3538
3539        follow_logs(&client, &base, "test", 100, None, &mut buf)
3540            .await
3541            .unwrap();
3542
3543        let output = String::from_utf8(buf).unwrap();
3544        // Should contain initial output + new lines + agent exit marker
3545        assert!(output.contains("line1"));
3546        assert!(output.contains("line2"));
3547        assert!(output.contains("line3"));
3548        assert!(output.contains("line4"));
3549        assert!(output.contains("[pulpo] Agent exited"));
3550    }
3551
3552    #[tokio::test]
3553    async fn test_execute_logs_follow_success() {
3554        let base = start_follow_test_server().await;
3555        // Extract host:port from http://127.0.0.1:PORT
3556        let node = base.strip_prefix("http://").unwrap().to_owned();
3557
3558        let cli = Cli {
3559            node,
3560            token: None,
3561            command: Some(Commands::Logs {
3562                name: "test".into(),
3563                lines: 100,
3564                follow: true,
3565            }),
3566            path: None,
3567        };
3568        // execute() with follow writes to stdout and returns empty string
3569        let result = execute(&cli).await.unwrap();
3570        assert_eq!(result, "");
3571    }
3572
3573    #[tokio::test]
3574    async fn test_execute_logs_follow_connection_refused() {
3575        let cli = Cli {
3576            node: "localhost:1".into(),
3577            token: None,
3578            command: Some(Commands::Logs {
3579                name: "test".into(),
3580                lines: 50,
3581                follow: true,
3582            }),
3583            path: None,
3584        };
3585        let result = execute(&cli).await;
3586        let err = result.unwrap_err().to_string();
3587        assert!(
3588            err.contains("Could not connect to pulpod"),
3589            "Expected friendly error, got: {err}"
3590        );
3591    }
3592
3593    #[tokio::test]
3594    async fn test_follow_logs_exits_on_dead() {
3595        use axum::{Router, extract::Path, extract::Query, routing::get};
3596
3597        let app = Router::new()
3598            .route(
3599                "/api/v1/sessions/{id}/output",
3600                get(
3601                    |_path: Path<String>,
3602                     _query: Query<std::collections::HashMap<String, String>>| async {
3603                        r#"{"output":"some output"}"#.to_owned()
3604                    },
3605                ),
3606            )
3607            .route(
3608                "/api/v1/sessions/{id}",
3609                get(|_path: Path<String>| async {
3610                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3611                }),
3612            );
3613
3614        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3615        let addr = listener.local_addr().unwrap();
3616        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3617        let base = format!("http://127.0.0.1:{}", addr.port());
3618
3619        let client = reqwest::Client::new();
3620        let mut buf = Vec::new();
3621        follow_logs(&client, &base, "test", 100, None, &mut buf)
3622            .await
3623            .unwrap();
3624
3625        let output = String::from_utf8(buf).unwrap();
3626        assert!(output.contains("some output"));
3627    }
3628
3629    #[tokio::test]
3630    async fn test_follow_logs_exits_on_stale() {
3631        use axum::{Router, extract::Path, extract::Query, routing::get};
3632
3633        let app = Router::new()
3634            .route(
3635                "/api/v1/sessions/{id}/output",
3636                get(
3637                    |_path: Path<String>,
3638                     _query: Query<std::collections::HashMap<String, String>>| async {
3639                        r#"{"output":"stale output"}"#.to_owned()
3640                    },
3641                ),
3642            )
3643            .route(
3644                "/api/v1/sessions/{id}",
3645                get(|_path: Path<String>| async {
3646                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3647                }),
3648            );
3649
3650        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3651        let addr = listener.local_addr().unwrap();
3652        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3653        let base = format!("http://127.0.0.1:{}", addr.port());
3654
3655        let client = reqwest::Client::new();
3656        let mut buf = Vec::new();
3657        follow_logs(&client, &base, "test", 100, None, &mut buf)
3658            .await
3659            .unwrap();
3660
3661        let output = String::from_utf8(buf).unwrap();
3662        assert!(output.contains("stale output"));
3663    }
3664
3665    #[tokio::test]
3666    async fn test_execute_logs_follow_non_reqwest_error() {
3667        use axum::{Router, extract::Path, extract::Query, routing::get};
3668
3669        // Session status endpoint returns invalid JSON to trigger a serde error
3670        let app = Router::new()
3671            .route(
3672                "/api/v1/sessions/{id}/output",
3673                get(
3674                    |_path: Path<String>,
3675                     _query: Query<std::collections::HashMap<String, String>>| async {
3676                        r#"{"output":"initial"}"#.to_owned()
3677                    },
3678                ),
3679            )
3680            .route(
3681                "/api/v1/sessions/{id}",
3682                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3683            );
3684
3685        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3686        let addr = listener.local_addr().unwrap();
3687        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3688        let node = format!("127.0.0.1:{}", addr.port());
3689
3690        let cli = Cli {
3691            node,
3692            token: None,
3693            command: Some(Commands::Logs {
3694                name: "test".into(),
3695                lines: 100,
3696                follow: true,
3697            }),
3698            path: None,
3699        };
3700        let err = execute(&cli).await.unwrap_err();
3701        // serde_json error, not a reqwest error — hits the Err(other) branch
3702        let msg = err.to_string();
3703        assert!(
3704            msg.contains("expected ident"),
3705            "Expected serde parse error, got: {msg}"
3706        );
3707    }
3708
3709    #[tokio::test]
3710    async fn test_fetch_session_status_connection_error() {
3711        let client = reqwest::Client::new();
3712        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3713        assert!(result.is_err());
3714    }
3715
3716    // -- Schedule tests --
3717
3718    #[test]
3719    fn test_format_schedules_empty() {
3720        assert_eq!(format_schedules(&[]), "No schedules.");
3721    }
3722
3723    #[test]
3724    fn test_format_schedules_with_entries() {
3725        let schedules = vec![serde_json::json!({
3726            "name": "nightly",
3727            "cron": "0 3 * * *",
3728            "enabled": true,
3729            "last_run_at": null,
3730            "target_node": null
3731        })];
3732        let output = format_schedules(&schedules);
3733        assert!(output.contains("nightly"));
3734        assert!(output.contains("0 3 * * *"));
3735        assert!(output.contains("local"));
3736        assert!(output.contains("yes"));
3737        assert!(output.contains('-'));
3738    }
3739
3740    #[test]
3741    fn test_format_schedules_disabled_entry() {
3742        let schedules = vec![serde_json::json!({
3743            "name": "weekly",
3744            "cron": "0 0 * * 0",
3745            "enabled": false,
3746            "last_run_at": "2026-03-18T03:00:00Z",
3747            "target_node": "gpu-box"
3748        })];
3749        let output = format_schedules(&schedules);
3750        assert!(output.contains("weekly"));
3751        assert!(output.contains("no"));
3752        assert!(output.contains("gpu-box"));
3753        assert!(output.contains("2026-03-18T03:00"));
3754    }
3755
3756    #[test]
3757    fn test_format_schedules_header() {
3758        let schedules = vec![serde_json::json!({
3759            "name": "test",
3760            "cron": "* * * * *",
3761            "enabled": true,
3762            "last_run_at": null,
3763            "target_node": null
3764        })];
3765        let output = format_schedules(&schedules);
3766        assert!(output.contains("NAME"));
3767        assert!(output.contains("CRON"));
3768        assert!(output.contains("ENABLED"));
3769        assert!(output.contains("LAST RUN"));
3770        assert!(output.contains("NODE"));
3771    }
3772
3773    // -- Schedule CLI parse tests --
3774
3775    #[test]
3776    fn test_cli_parse_schedule_add() {
3777        let cli = Cli::try_parse_from([
3778            "pulpo",
3779            "schedule",
3780            "add",
3781            "nightly",
3782            "0 3 * * *",
3783            "--workdir",
3784            "/repo",
3785            "--",
3786            "claude",
3787            "-p",
3788            "review",
3789        ])
3790        .unwrap();
3791        assert!(matches!(
3792            &cli.command,
3793            Some(Commands::Schedule {
3794                action: ScheduleAction::Add { name, cron, .. }
3795            }) if name == "nightly" && cron == "0 3 * * *"
3796        ));
3797    }
3798
3799    #[test]
3800    fn test_cli_parse_schedule_add_with_node() {
3801        let cli = Cli::try_parse_from([
3802            "pulpo",
3803            "schedule",
3804            "add",
3805            "nightly",
3806            "0 3 * * *",
3807            "--workdir",
3808            "/repo",
3809            "--node",
3810            "gpu-box",
3811            "--",
3812            "claude",
3813        ])
3814        .unwrap();
3815        assert!(matches!(
3816            &cli.command,
3817            Some(Commands::Schedule {
3818                action: ScheduleAction::Add { node, .. }
3819            }) if node.as_deref() == Some("gpu-box")
3820        ));
3821    }
3822
3823    #[test]
3824    fn test_cli_parse_schedule_add_install_alias() {
3825        let cli =
3826            Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3827        assert!(matches!(
3828            &cli.command,
3829            Some(Commands::Schedule {
3830                action: ScheduleAction::Add { name, .. }
3831            }) if name == "nightly"
3832        ));
3833    }
3834
3835    #[test]
3836    fn test_cli_parse_schedule_list() {
3837        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3838        assert!(matches!(
3839            &cli.command,
3840            Some(Commands::Schedule {
3841                action: ScheduleAction::List
3842            })
3843        ));
3844    }
3845
3846    #[test]
3847    fn test_cli_parse_schedule_remove() {
3848        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3849        assert!(matches!(
3850            &cli.command,
3851            Some(Commands::Schedule {
3852                action: ScheduleAction::Remove { name }
3853            }) if name == "nightly"
3854        ));
3855    }
3856
3857    #[test]
3858    fn test_cli_parse_schedule_pause() {
3859        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3860        assert!(matches!(
3861            &cli.command,
3862            Some(Commands::Schedule {
3863                action: ScheduleAction::Pause { name }
3864            }) if name == "nightly"
3865        ));
3866    }
3867
3868    #[test]
3869    fn test_cli_parse_schedule_resume() {
3870        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3871        assert!(matches!(
3872            &cli.command,
3873            Some(Commands::Schedule {
3874                action: ScheduleAction::Resume { name }
3875            }) if name == "nightly"
3876        ));
3877    }
3878
3879    #[test]
3880    fn test_cli_parse_schedule_alias() {
3881        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3882        assert!(matches!(
3883            &cli.command,
3884            Some(Commands::Schedule {
3885                action: ScheduleAction::List
3886            })
3887        ));
3888    }
3889
3890    #[test]
3891    fn test_cli_parse_schedule_list_alias() {
3892        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3893        assert!(matches!(
3894            &cli.command,
3895            Some(Commands::Schedule {
3896                action: ScheduleAction::List
3897            })
3898        ));
3899    }
3900
3901    #[test]
3902    fn test_cli_parse_schedule_remove_alias() {
3903        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3904        assert!(matches!(
3905            &cli.command,
3906            Some(Commands::Schedule {
3907                action: ScheduleAction::Remove { name }
3908            }) if name == "nightly"
3909        ));
3910    }
3911
3912    #[tokio::test]
3913    async fn test_execute_schedule_list_via_execute() {
3914        let node = start_test_server().await;
3915        let cli = Cli {
3916            node,
3917            token: None,
3918            command: Some(Commands::Schedule {
3919                action: ScheduleAction::List,
3920            }),
3921            path: None,
3922        };
3923        let result = execute(&cli).await.unwrap();
3924        // Under coverage, execute_schedule is a stub that returns empty string
3925        #[cfg(coverage)]
3926        assert!(result.is_empty());
3927        #[cfg(not(coverage))]
3928        assert_eq!(result, "No schedules.");
3929    }
3930
3931    #[test]
3932    fn test_schedule_action_debug() {
3933        let action = ScheduleAction::List;
3934        assert_eq!(format!("{action:?}"), "List");
3935    }
3936
3937    #[test]
3938    fn test_cli_parse_send_alias() {
3939        let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3940        assert!(matches!(
3941            &cli.command,
3942            Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3943        ));
3944    }
3945
3946    #[test]
3947    fn test_cli_parse_spawn_no_name() {
3948        let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3949        assert!(matches!(
3950            &cli.command,
3951            Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3952        ));
3953    }
3954
3955    #[test]
3956    fn test_cli_parse_spawn_optional_name_with_command() {
3957        let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3958        assert!(matches!(
3959            &cli.command,
3960            Some(Commands::Spawn { name, command, .. })
3961                if name.is_none() && command == &["echo", "hello"]
3962        ));
3963    }
3964
3965    #[test]
3966    fn test_cli_parse_path_shortcut() {
3967        let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3968        assert!(cli.command.is_none());
3969        assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3970    }
3971
3972    #[test]
3973    fn test_cli_parse_no_args() {
3974        let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3975        assert!(cli.command.is_none());
3976        assert!(cli.path.is_none());
3977    }
3978
3979    #[test]
3980    fn test_derive_session_name_simple() {
3981        assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3982    }
3983
3984    #[test]
3985    fn test_derive_session_name_with_special_chars() {
3986        assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3987    }
3988
3989    #[test]
3990    fn test_derive_session_name_root() {
3991        assert_eq!(derive_session_name("/"), "session");
3992    }
3993
3994    #[test]
3995    fn test_derive_session_name_dots() {
3996        assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3997    }
3998
3999    #[test]
4000    fn test_resolve_path_absolute() {
4001        assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4002    }
4003
4004    #[test]
4005    fn test_resolve_path_relative() {
4006        let resolved = resolve_path("my-repo");
4007        assert!(resolved.ends_with("my-repo"));
4008        assert!(resolved.starts_with('/'));
4009    }
4010
4011    #[tokio::test]
4012    async fn test_execute_no_args_shows_help() {
4013        let node = start_test_server().await;
4014        let cli = Cli {
4015            node,
4016            token: None,
4017            path: None,
4018            command: None,
4019        };
4020        let result = execute(&cli).await.unwrap();
4021        assert!(
4022            result.is_empty(),
4023            "no-args should return empty string after printing help"
4024        );
4025    }
4026
4027    #[tokio::test]
4028    async fn test_execute_path_shortcut() {
4029        let node = start_test_server().await;
4030        let cli = Cli {
4031            node,
4032            token: None,
4033            path: Some("/tmp".into()),
4034            command: None,
4035        };
4036        let result = execute(&cli).await.unwrap();
4037        assert!(result.contains("Detached from session"));
4038    }
4039
4040    #[tokio::test]
4041    async fn test_deduplicate_session_name_no_conflict() {
4042        // Connection refused → falls through to "name not taken" path
4043        let base = "http://127.0.0.1:1";
4044        let client = reqwest::Client::new();
4045        let name = deduplicate_session_name(&client, base, "fresh", None).await;
4046        assert_eq!(name, "fresh");
4047    }
4048
4049    #[tokio::test]
4050    async fn test_deduplicate_session_name_with_conflict() {
4051        use axum::{Router, routing::get};
4052        use std::sync::atomic::{AtomicU32, Ordering};
4053
4054        let call_count = std::sync::Arc::new(AtomicU32::new(0));
4055        let counter = call_count.clone();
4056        let app = Router::new()
4057            .route(
4058                "/api/v1/sessions/{id}",
4059                get(move || {
4060                    let c = counter.clone();
4061                    async move {
4062                        let n = c.fetch_add(1, Ordering::SeqCst);
4063                        if n == 0 {
4064                            // First call (base name) → exists
4065                            (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4066                        } else {
4067                            // Suffixed name → not found
4068                            (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4069                        }
4070                    }
4071                }),
4072            )
4073            .route(
4074                "/api/v1/peers",
4075                get(|| async {
4076                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4077                }),
4078            );
4079        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4080        let addr = listener.local_addr().unwrap();
4081        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4082        let base = format!("http://127.0.0.1:{}", addr.port());
4083        let client = reqwest::Client::new();
4084        let name = deduplicate_session_name(&client, &base, "repo", None).await;
4085        assert_eq!(name, "repo-2");
4086    }
4087
4088    // -- Node resolution tests --
4089
4090    #[test]
4091    fn test_node_needs_resolution() {
4092        assert!(!node_needs_resolution("localhost:7433"));
4093        assert!(!node_needs_resolution("mac-mini:7433"));
4094        assert!(!node_needs_resolution("10.0.0.1:7433"));
4095        assert!(!node_needs_resolution("[::1]:7433"));
4096        assert!(node_needs_resolution("mac-mini"));
4097        assert!(node_needs_resolution("linux-server"));
4098        assert!(node_needs_resolution("localhost"));
4099    }
4100
4101    #[tokio::test]
4102    async fn test_resolve_node_with_port() {
4103        let client = reqwest::Client::new();
4104        let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4105        assert_eq!(addr, "mac-mini:7433");
4106        assert!(token.is_none());
4107    }
4108
4109    #[tokio::test]
4110    async fn test_resolve_node_fallback_appends_port() {
4111        // No local daemon running on localhost:7433, so peer lookup fails
4112        // and it falls back to appending :7433
4113        let client = reqwest::Client::new();
4114        let (addr, token) = resolve_node(&client, "unknown-host").await;
4115        assert_eq!(addr, "unknown-host:7433");
4116        assert!(token.is_none());
4117    }
4118
4119    #[cfg(not(coverage))]
4120    #[tokio::test]
4121    async fn test_resolve_node_finds_peer() {
4122        use axum::{Router, routing::get};
4123
4124        let app = Router::new()
4125            .route(
4126                "/api/v1/peers",
4127                get(|| async {
4128                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4129                }),
4130            )
4131            .route(
4132                "/api/v1/config",
4133                get(|| async {
4134                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4135                }),
4136            );
4137
4138        // Port 7433 may be in use; skip test if so
4139        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4140            return;
4141        };
4142        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4143
4144        let client = reqwest::Client::new();
4145        let (addr, token) = resolve_node(&client, "mac-mini").await;
4146        assert_eq!(addr, "10.0.0.5:7433");
4147        assert_eq!(token, Some("peer-secret".into()));
4148    }
4149
4150    #[cfg(not(coverage))]
4151    #[tokio::test]
4152    async fn test_resolve_node_peer_no_token() {
4153        use axum::{Router, routing::get};
4154
4155        let app = Router::new()
4156            .route(
4157                "/api/v1/peers",
4158                get(|| async {
4159                    r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4160                }),
4161            )
4162            .route(
4163                "/api/v1/config",
4164                get(|| async {
4165                    r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4166                }),
4167            );
4168
4169        let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4170            return; // Port in use, skip
4171        };
4172        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4173
4174        let client = reqwest::Client::new();
4175        let (addr, token) = resolve_node(&client, "test-peer").await;
4176        assert_eq!(addr, "10.0.0.9:7433");
4177        assert!(token.is_none()); // Simple peer entry has no token
4178    }
4179
4180    #[tokio::test]
4181    async fn test_execute_with_peer_name_resolution() {
4182        // When node doesn't contain ':', resolve_node is called.
4183        // Since there's no local daemon on port 7433, it falls back to appending :7433.
4184        // The connection to the fallback address will fail, giving us a connection error.
4185        let cli = Cli {
4186            node: "nonexistent-peer".into(),
4187            token: None,
4188            command: Some(Commands::List { all: false }),
4189            path: None,
4190        };
4191        let result = execute(&cli).await;
4192        // Should try to connect to nonexistent-peer:7433 and fail
4193        assert!(result.is_err());
4194    }
4195
4196    // -- Auto node selection tests --
4197
4198    #[test]
4199    fn test_cli_parse_spawn_auto() {
4200        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4201        assert!(matches!(
4202            &cli.command,
4203            Some(Commands::Spawn { auto, .. }) if *auto
4204        ));
4205    }
4206
4207    #[test]
4208    fn test_cli_parse_spawn_auto_default() {
4209        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4210        assert!(matches!(
4211            &cli.command,
4212            Some(Commands::Spawn { auto, .. }) if !auto
4213        ));
4214    }
4215
4216    #[tokio::test]
4217    async fn test_select_best_node_coverage_stub() {
4218        // Exercise the coverage stub (or real function in non-coverage builds)
4219        let client = reqwest::Client::new();
4220        // In coverage builds, the stub returns ("localhost:7433", "local")
4221        // In non-coverage builds, this fails because no server is running — that's OK
4222        let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4223    }
4224
4225    #[cfg(not(coverage))]
4226    #[tokio::test]
4227    async fn test_select_best_node_picks_least_loaded() {
4228        use axum::{Router, routing::get};
4229
4230        let app = Router::new().route(
4231            "/api/v1/peers",
4232            get(|| async {
4233                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4234            }),
4235        );
4236        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4237        let addr = listener.local_addr().unwrap();
4238        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4239        let base = format!("http://127.0.0.1:{}", addr.port());
4240
4241        let client = reqwest::Client::new();
4242        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4243        // "idle" has 1 session + 16384 MB → score = 1 - 16 = -15
4244        // "busy" has 5 sessions + 8192 MB → score = 5 - 8 = -3
4245        // idle wins (lower score)
4246        assert_eq!(name, "idle");
4247        assert_eq!(addr, "idle:7433");
4248    }
4249
4250    #[cfg(not(coverage))]
4251    #[tokio::test]
4252    async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4253        use axum::{Router, routing::get};
4254
4255        let app = Router::new().route(
4256            "/api/v1/peers",
4257            get(|| async {
4258                r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4259            }),
4260        );
4261        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4262        let addr = listener.local_addr().unwrap();
4263        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4264        let base = format!("http://127.0.0.1:{}", addr.port());
4265
4266        let client = reqwest::Client::new();
4267        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4268        assert_eq!(name, "my-mac");
4269        assert_eq!(addr, "localhost:7433");
4270    }
4271
4272    #[cfg(not(coverage))]
4273    #[tokio::test]
4274    async fn test_select_best_node_empty_peers_falls_back_to_local() {
4275        use axum::{Router, routing::get};
4276
4277        let app = Router::new().route(
4278            "/api/v1/peers",
4279            get(|| async {
4280                r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4281            }),
4282        );
4283        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4284        let addr = listener.local_addr().unwrap();
4285        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4286        let base = format!("http://127.0.0.1:{}", addr.port());
4287
4288        let client = reqwest::Client::new();
4289        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4290        assert_eq!(name, "solo");
4291        assert_eq!(addr, "localhost:7433");
4292    }
4293
4294    #[cfg(not(coverage))]
4295    #[tokio::test]
4296    async fn test_execute_spawn_auto_selects_node() {
4297        use axum::{
4298            Router,
4299            http::StatusCode,
4300            routing::{get, post},
4301        };
4302
4303        let create_json = test_create_response_json();
4304
4305        // Bind early so we know the address to embed in the peers response
4306        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4307        let addr = listener.local_addr().unwrap();
4308        let node = format!("127.0.0.1:{}", addr.port());
4309        let peer_addr = node.clone();
4310
4311        let app = Router::new()
4312            .route(
4313                "/api/v1/peers",
4314                get(move || {
4315                    let peer_addr = peer_addr.clone();
4316                    async move {
4317                        format!(
4318                            r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4319                        )
4320                    }
4321                }),
4322            )
4323            .route(
4324                "/api/v1/sessions",
4325                post(move || async move {
4326                    (StatusCode::CREATED, create_json.clone())
4327                }),
4328            );
4329
4330        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4331
4332        let cli = Cli {
4333            node,
4334            token: None,
4335            command: Some(Commands::Spawn {
4336                name: Some("test".into()),
4337                workdir: Some("/tmp/repo".into()),
4338                ink: None,
4339                description: None,
4340                detach: true,
4341                idle_threshold: None,
4342                auto: true,
4343                worktree: false,
4344                runtime: None,
4345                secret: vec![],
4346                command: vec!["echo".into(), "hello".into()],
4347            }),
4348            path: None,
4349        };
4350        let result = execute(&cli).await.unwrap();
4351        assert!(result.contains("Created session"));
4352    }
4353
4354    #[cfg(not(coverage))]
4355    #[tokio::test]
4356    async fn test_select_best_node_peer_no_session_count() {
4357        use axum::{Router, routing::get};
4358
4359        let app = Router::new().route(
4360            "/api/v1/peers",
4361            get(|| async {
4362                r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4363            }),
4364        );
4365        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4366        let addr = listener.local_addr().unwrap();
4367        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4368        let base = format!("http://127.0.0.1:{}", addr.port());
4369
4370        let client = reqwest::Client::new();
4371        let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4372        // Online peer with no session_count (0) and no node_info (0 mem) → score = 0
4373        assert_eq!(name, "fresh");
4374        assert_eq!(addr, "fresh:7433");
4375    }
4376
4377    // -- Secret CLI parse tests --
4378
4379    #[test]
4380    fn test_cli_parse_secret_set() {
4381        let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4382        assert!(matches!(
4383            &cli.command,
4384            Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4385                if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4386        ));
4387    }
4388
4389    #[test]
4390    fn test_cli_parse_secret_list() {
4391        let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4392        assert!(matches!(
4393            &cli.command,
4394            Some(Commands::Secret {
4395                action: SecretAction::List
4396            })
4397        ));
4398    }
4399
4400    #[test]
4401    fn test_cli_parse_secret_list_alias() {
4402        let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4403        assert!(matches!(
4404            &cli.command,
4405            Some(Commands::Secret {
4406                action: SecretAction::List
4407            })
4408        ));
4409    }
4410
4411    #[test]
4412    fn test_cli_parse_secret_delete() {
4413        let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4414        assert!(matches!(
4415            &cli.command,
4416            Some(Commands::Secret { action: SecretAction::Delete { name } })
4417                if name == "MY_TOKEN"
4418        ));
4419    }
4420
4421    #[test]
4422    fn test_cli_parse_secret_delete_alias() {
4423        let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4424        assert!(matches!(
4425            &cli.command,
4426            Some(Commands::Secret { action: SecretAction::Delete { name } })
4427                if name == "MY_TOKEN"
4428        ));
4429    }
4430
4431    #[test]
4432    fn test_cli_parse_secret_alias() {
4433        let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4434        assert!(matches!(
4435            &cli.command,
4436            Some(Commands::Secret {
4437                action: SecretAction::List
4438            })
4439        ));
4440    }
4441
4442    #[test]
4443    fn test_format_secrets_empty() {
4444        let secrets: Vec<serde_json::Value> = vec![];
4445        assert_eq!(format_secrets(&secrets), "No secrets configured.");
4446    }
4447
4448    #[test]
4449    fn test_format_secrets_with_entries() {
4450        let secrets = vec![
4451            serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4452            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4453        ];
4454        let output = format_secrets(&secrets);
4455        assert!(output.contains("GITHUB_TOKEN"));
4456        assert!(output.contains("NPM_TOKEN"));
4457        assert!(output.contains("NAME"));
4458        assert!(output.contains("ENV"));
4459        assert!(output.contains("CREATED"));
4460    }
4461
4462    #[test]
4463    fn test_format_secrets_with_env() {
4464        let secrets = vec![
4465            serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4466            serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4467        ];
4468        let output = format_secrets(&secrets);
4469        assert!(output.contains("GH_WORK"));
4470        assert!(output.contains("GITHUB_TOKEN"));
4471        assert!(output.contains("NPM_TOKEN"));
4472    }
4473
4474    #[test]
4475    fn test_format_secrets_short_timestamp() {
4476        let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4477        let output = format_secrets(&secrets);
4478        assert!(output.contains("now"));
4479    }
4480
4481    #[test]
4482    fn test_format_schedules_short_last_run_at() {
4483        // Regression: last_run_at shorter than 16 chars must not panic
4484        let schedules = vec![serde_json::json!({
4485            "name": "test",
4486            "cron": "* * * * *",
4487            "enabled": true,
4488            "last_run_at": "short",
4489            "target_node": null
4490        })];
4491        let output = format_schedules(&schedules);
4492        assert!(output.contains("short"));
4493    }
4494
4495    #[test]
4496    fn test_format_sessions_multibyte_command_truncation() {
4497        use chrono::Utc;
4498        use pulpo_common::session::SessionStatus;
4499        use uuid::Uuid;
4500
4501        // Command with multi-byte chars exceeding 50 bytes; must not panic
4502        let sessions = vec![Session {
4503            id: Uuid::nil(),
4504            name: "test".into(),
4505            workdir: "/tmp".into(),
4506            command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4507            description: None,
4508            status: SessionStatus::Active,
4509            exit_code: None,
4510            backend_session_id: None,
4511            output_snapshot: None,
4512            metadata: None,
4513            ink: None,
4514            intervention_code: None,
4515            intervention_reason: None,
4516            intervention_at: None,
4517            last_output_at: None,
4518            idle_since: None,
4519            idle_threshold_secs: None,
4520            worktree_path: None,
4521            runtime: Runtime::Tmux,
4522            created_at: Utc::now(),
4523            updated_at: Utc::now(),
4524        }];
4525        let output = format_sessions(&sessions);
4526        assert!(output.contains("..."));
4527    }
4528}