Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{AuthTokenResponse, InterventionEventResponse, PeersResponse};
4use pulpo_common::session::Session;
5
6#[derive(Parser, Debug)]
7#[command(
8    name = "pulpo",
9    about = "Manage agent sessions across your machines",
10    version
11)]
12pub struct Cli {
13    /// Target node (default: localhost)
14    #[arg(long, default_value = "localhost:7433")]
15    pub node: String,
16
17    /// Auth token (auto-discovered from local daemon if omitted)
18    #[arg(long)]
19    pub token: Option<String>,
20
21    #[command(subcommand)]
22    pub command: Commands,
23}
24
25#[derive(Subcommand, Debug)]
26#[allow(clippy::large_enum_variant)]
27pub enum Commands {
28    /// Attach to a session's terminal
29    #[command(visible_alias = "a")]
30    Attach {
31        /// Session name or ID
32        name: String,
33    },
34
35    /// Send input to a session
36    #[command(visible_alias = "i")]
37    Input {
38        /// Session name or ID
39        name: String,
40        /// Text to send (sends Enter if omitted)
41        text: Option<String>,
42    },
43
44    /// Spawn a new agent session
45    #[command(visible_alias = "s")]
46    Spawn {
47        /// Working directory
48        #[arg(long)]
49        workdir: String,
50
51        /// Session name (auto-derived from workdir if omitted)
52        #[arg(long)]
53        name: Option<String>,
54
55        /// Agent provider (claude, codex, gemini, opencode)
56        #[arg(long, default_value = "claude")]
57        provider: String,
58
59        /// Run in autonomous mode (fire-and-forget)
60        #[arg(long)]
61        auto: bool,
62
63        /// Guard preset (strict, standard, unrestricted)
64        #[arg(long, default_value = "standard")]
65        guard: String,
66
67        /// Model override (e.g. opus, sonnet)
68        #[arg(long)]
69        model: Option<String>,
70
71        /// System prompt to append
72        #[arg(long)]
73        system_prompt: Option<String>,
74
75        /// Explicit allowed tools (comma-separated)
76        #[arg(long, value_delimiter = ',')]
77        allowed_tools: Option<Vec<String>>,
78
79        /// Ink name (from config)
80        #[arg(long)]
81        ink: Option<String>,
82
83        /// Maximum agent turns before stopping
84        #[arg(long)]
85        max_turns: Option<u32>,
86
87        /// Maximum budget in USD before stopping
88        #[arg(long)]
89        max_budget: Option<f64>,
90
91        /// Output format (e.g. json, stream-json)
92        #[arg(long)]
93        output_format: Option<String>,
94
95        /// Task prompt
96        prompt: Vec<String>,
97    },
98
99    /// List all sessions
100    #[command(visible_alias = "ls")]
101    List,
102
103    /// Show session logs/output
104    #[command(visible_alias = "l")]
105    Logs {
106        /// Session name or ID
107        name: String,
108
109        /// Number of lines to fetch
110        #[arg(long, default_value = "100")]
111        lines: usize,
112
113        /// Follow output (like `tail -f`)
114        #[arg(short, long)]
115        follow: bool,
116    },
117
118    /// Kill a session
119    #[command(visible_alias = "k")]
120    Kill {
121        /// Session name or ID
122        name: String,
123    },
124
125    /// Permanently remove a session from history
126    #[command(visible_alias = "rm")]
127    Delete {
128        /// Session name or ID
129        name: String,
130    },
131
132    /// Resume a stale session
133    #[command(visible_alias = "r")]
134    Resume {
135        /// Session name or ID
136        name: String,
137    },
138
139    /// List all known nodes
140    #[command(visible_alias = "n")]
141    Nodes,
142
143    /// Show intervention history for a session
144    #[command(visible_alias = "iv")]
145    Interventions {
146        /// Session name or ID
147        name: String,
148    },
149
150    /// Open the web dashboard in your browser
151    Ui,
152
153    /// Manage scheduled agent runs via crontab
154    #[command(visible_alias = "sched")]
155    Schedule {
156        #[command(subcommand)]
157        action: ScheduleAction,
158    },
159}
160
161#[derive(Subcommand, Debug)]
162pub enum ScheduleAction {
163    /// Install a cron schedule that spawns a session
164    Install {
165        /// Schedule name
166        name: String,
167        /// Cron expression (e.g. "0 3 * * *")
168        cron: String,
169        /// Working directory
170        #[arg(long)]
171        workdir: String,
172        /// Agent provider
173        #[arg(long, default_value = "claude")]
174        provider: String,
175        /// Task prompt
176        prompt: Vec<String>,
177    },
178    /// List installed pulpo cron schedules
179    #[command(alias = "ls")]
180    List,
181    /// Remove a cron schedule
182    #[command(alias = "rm")]
183    Remove {
184        /// Schedule name
185        name: String,
186    },
187    /// Pause a cron schedule (comments out the line)
188    Pause {
189        /// Schedule name
190        name: String,
191    },
192    /// Resume a paused cron schedule (uncomments the line)
193    Resume {
194        /// Schedule name
195        name: String,
196    },
197}
198
199/// Format the base URL from the node address.
200pub fn base_url(node: &str) -> String {
201    format!("http://{node}")
202}
203
204/// Response shape for the output endpoint.
205#[derive(serde::Deserialize)]
206struct OutputResponse {
207    output: String,
208}
209
210/// Format a list of sessions as a table.
211fn format_sessions(sessions: &[Session]) -> String {
212    if sessions.is_empty() {
213        return "No sessions.".into();
214    }
215    let mut lines = vec![format!(
216        "{:<20} {:<12} {:<10} {:<14} {}",
217        "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
218    )];
219    for s in sessions {
220        let prompt_display = if s.prompt.len() > 40 {
221            format!("{}...", &s.prompt[..37])
222        } else {
223            s.prompt.clone()
224        };
225        let status_display = if s.waiting_for_input {
226            "waiting".to_owned()
227        } else {
228            s.status.to_string()
229        };
230        lines.push(format!(
231            "{:<20} {:<12} {:<10} {:<14} {}",
232            s.name, status_display, s.provider, s.mode, prompt_display
233        ));
234    }
235    lines.join("\n")
236}
237
238/// Format the peers response as a table.
239fn format_nodes(resp: &PeersResponse) -> String {
240    let mut lines = vec![format!(
241        "{:<20} {:<25} {:<10} {}",
242        "NAME", "ADDRESS", "STATUS", "SESSIONS"
243    )];
244    lines.push(format!(
245        "{:<20} {:<25} {:<10} {}",
246        resp.local.name, "(local)", "online", "-"
247    ));
248    for p in &resp.peers {
249        let sessions = p
250            .session_count
251            .map_or_else(|| "-".into(), |c| c.to_string());
252        lines.push(format!(
253            "{:<20} {:<25} {:<10} {}",
254            p.name, p.address, p.status, sessions
255        ));
256    }
257    lines.join("\n")
258}
259
260/// Format intervention events as a table.
261fn format_interventions(events: &[InterventionEventResponse]) -> String {
262    if events.is_empty() {
263        return "No intervention events.".into();
264    }
265    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
266    for e in events {
267        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
268    }
269    lines.join("\n")
270}
271
272/// Build the command to open a URL in the default browser.
273#[cfg_attr(coverage, allow(dead_code))]
274fn build_open_command(url: &str) -> std::process::Command {
275    #[cfg(target_os = "macos")]
276    {
277        let mut cmd = std::process::Command::new("open");
278        cmd.arg(url);
279        cmd
280    }
281    #[cfg(target_os = "linux")]
282    {
283        let mut cmd = std::process::Command::new("xdg-open");
284        cmd.arg(url);
285        cmd
286    }
287    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
288    {
289        // Fallback: try xdg-open
290        let mut cmd = std::process::Command::new("xdg-open");
291        cmd.arg(url);
292        cmd
293    }
294}
295
296/// Open a URL in the default browser.
297#[cfg(not(coverage))]
298fn open_browser(url: &str) -> Result<()> {
299    build_open_command(url).status()?;
300    Ok(())
301}
302
303/// Stub for coverage builds — avoids opening a browser during tests.
304#[cfg(coverage)]
305fn open_browser(_url: &str) -> Result<()> {
306    Ok(())
307}
308
309/// Build the command to attach to a session's terminal.
310/// Takes the backend session ID (e.g. `pulpo-my-session`) — the tmux session name.
311#[cfg_attr(coverage, allow(dead_code))]
312fn build_attach_command(backend_session_id: &str) -> std::process::Command {
313    let mut cmd = std::process::Command::new("tmux");
314    cmd.args(["attach-session", "-t", backend_session_id]);
315    cmd
316}
317
318/// Attach to a session's terminal.
319#[cfg(not(any(test, coverage)))]
320fn attach_session(backend_session_id: &str) -> Result<()> {
321    let status = build_attach_command(backend_session_id).status()?;
322    if !status.success() {
323        anyhow::bail!("attach failed with {status}");
324    }
325    Ok(())
326}
327
328/// Stub for test and coverage builds — avoids spawning real terminals during tests.
329#[cfg(any(test, coverage))]
330#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
331fn attach_session(_backend_session_id: &str) -> Result<()> {
332    Ok(())
333}
334
335/// Extract a clean error message from an API JSON response (or fall back to raw text).
336fn api_error(text: &str) -> anyhow::Error {
337    serde_json::from_str::<serde_json::Value>(text)
338        .ok()
339        .and_then(|v| v["error"].as_str().map(String::from))
340        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
341}
342
343/// Return the response body text, or a clean error if the response was non-success.
344async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
345    if resp.status().is_success() {
346        Ok(resp.text().await?)
347    } else {
348        let text = resp.text().await?;
349        Err(api_error(&text))
350    }
351}
352
353/// Map a reqwest error to a user-friendly message.
354fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
355    if err.is_connect() {
356        anyhow::anyhow!(
357            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
358        )
359    } else {
360        anyhow::anyhow!("Network error connecting to {node}: {err}")
361    }
362}
363
364/// Check if the node address points to localhost.
365fn is_localhost(node: &str) -> bool {
366    let host = node.split(':').next().unwrap_or(node);
367    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
368}
369
370/// Try to auto-discover the auth token from a local daemon.
371async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
372    let resp = client
373        .get(format!("{base}/api/v1/auth/token"))
374        .send()
375        .await
376        .ok()?;
377    let body: AuthTokenResponse = resp.json().await.ok()?;
378    if body.token.is_empty() {
379        None
380    } else {
381        Some(body.token)
382    }
383}
384
385/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
386async fn resolve_token(
387    client: &reqwest::Client,
388    base: &str,
389    node: &str,
390    explicit: Option<&str>,
391) -> Option<String> {
392    if let Some(t) = explicit {
393        return Some(t.to_owned());
394    }
395    if is_localhost(node) {
396        return discover_token(client, base).await;
397    }
398    None
399}
400
401/// Build an authenticated GET request.
402fn authed_get(
403    client: &reqwest::Client,
404    url: String,
405    token: Option<&str>,
406) -> reqwest::RequestBuilder {
407    let req = client.get(url);
408    if let Some(t) = token {
409        req.bearer_auth(t)
410    } else {
411        req
412    }
413}
414
415/// Build an authenticated POST request.
416fn authed_post(
417    client: &reqwest::Client,
418    url: String,
419    token: Option<&str>,
420) -> reqwest::RequestBuilder {
421    let req = client.post(url);
422    if let Some(t) = token {
423        req.bearer_auth(t)
424    } else {
425        req
426    }
427}
428
429/// Build an authenticated DELETE request.
430fn authed_delete(
431    client: &reqwest::Client,
432    url: String,
433    token: Option<&str>,
434) -> reqwest::RequestBuilder {
435    let req = client.delete(url);
436    if let Some(t) = token {
437        req.bearer_auth(t)
438    } else {
439        req
440    }
441}
442
443/// Fetch session output from the API.
444async fn fetch_output(
445    client: &reqwest::Client,
446    base: &str,
447    name: &str,
448    lines: usize,
449    token: Option<&str>,
450) -> Result<String> {
451    let resp = authed_get(
452        client,
453        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
454        token,
455    )
456    .send()
457    .await?;
458    let text = ok_or_api_error(resp).await?;
459    let output: OutputResponse = serde_json::from_str(&text)?;
460    Ok(output.output)
461}
462
463/// Fetch session status from the API.
464async fn fetch_session_status(
465    client: &reqwest::Client,
466    base: &str,
467    name: &str,
468    token: Option<&str>,
469) -> Result<String> {
470    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
471        .send()
472        .await?;
473    let text = ok_or_api_error(resp).await?;
474    let session: Session = serde_json::from_str(&text)?;
475    Ok(session.status.to_string())
476}
477
478/// Compute the new trailing lines that differ from the previous output.
479///
480/// The output endpoint returns the last N lines from the terminal pane. As new lines
481/// appear, old lines at the top scroll off. We find the overlap between the end
482/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
483/// trailing lines.
484fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
485    if prev.is_empty() {
486        return new;
487    }
488
489    let prev_lines: Vec<&str> = prev.lines().collect();
490    let new_lines: Vec<&str> = new.lines().collect();
491
492    if new_lines.is_empty() {
493        return "";
494    }
495
496    // prev is non-empty (early return above), so last() always succeeds
497    let last_prev = prev_lines[prev_lines.len() - 1];
498
499    // Find the last line of prev in new to determine the overlap boundary
500    for i in (0..new_lines.len()).rev() {
501        if new_lines[i] == last_prev {
502            // Verify contiguous overlap: check that lines before this match too
503            let overlap_len = prev_lines.len().min(i + 1);
504            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
505            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
506            if prev_tail == new_overlap {
507                if i + 1 < new_lines.len() {
508                    // Return the slice of `new` after the overlap
509                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
510                    return new.get(consumed.min(new.len())..).unwrap_or("");
511                }
512                return "";
513            }
514        }
515    }
516
517    // No overlap found — output changed completely, print it all
518    new
519}
520
521/// Follow logs by polling, printing only new output. Returns when the session ends.
522async fn follow_logs(
523    client: &reqwest::Client,
524    base: &str,
525    name: &str,
526    lines: usize,
527    token: Option<&str>,
528    writer: &mut (dyn std::io::Write + Send),
529) -> Result<()> {
530    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
531    write!(writer, "{prev_output}")?;
532
533    loop {
534        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
535
536        // Check session status
537        let status = fetch_session_status(client, base, name, token).await?;
538        let is_terminal = status == "completed" || status == "dead" || status == "stale";
539
540        // Fetch latest output
541        let new_output = fetch_output(client, base, name, lines, token).await?;
542
543        let diff = diff_output(&prev_output, &new_output);
544        if !diff.is_empty() {
545            write!(writer, "{diff}")?;
546        }
547        prev_output = new_output;
548
549        if is_terminal {
550            break;
551        }
552    }
553    Ok(())
554}
555
556// --- Crontab wrapper ---
557
558#[cfg_attr(coverage, allow(dead_code))]
559const CRONTAB_TAG: &str = "#pulpo:";
560
561/// Read the current crontab. Returns empty string if no crontab exists.
562#[cfg(not(coverage))]
563fn read_crontab() -> Result<String> {
564    let output = std::process::Command::new("crontab").arg("-l").output()?;
565    if output.status.success() {
566        Ok(String::from_utf8_lossy(&output.stdout).to_string())
567    } else {
568        Ok(String::new())
569    }
570}
571
572/// Write the given content as the user's crontab.
573#[cfg(not(coverage))]
574fn write_crontab(content: &str) -> Result<()> {
575    use std::io::Write;
576    let mut child = std::process::Command::new("crontab")
577        .arg("-")
578        .stdin(std::process::Stdio::piped())
579        .spawn()?;
580    child
581        .stdin
582        .as_mut()
583        .unwrap()
584        .write_all(content.as_bytes())?;
585    let status = child.wait()?;
586    if !status.success() {
587        anyhow::bail!("crontab write failed");
588    }
589    Ok(())
590}
591
592/// Build the crontab line for a pulpo schedule.
593#[cfg_attr(coverage, allow(dead_code))]
594fn build_crontab_line(
595    name: &str,
596    cron: &str,
597    workdir: &str,
598    provider: &str,
599    prompt: &str,
600    node: &str,
601) -> String {
602    format!(
603        "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
604    )
605}
606
607/// Install a cron schedule into a crontab string. Returns the updated crontab.
608#[cfg_attr(coverage, allow(dead_code))]
609fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
610    let tag = format!("{CRONTAB_TAG}{name}");
611    if crontab.contains(&tag) {
612        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
613    }
614    let mut result = crontab.to_owned();
615    result.push_str(line);
616    Ok(result)
617}
618
619/// Format pulpo crontab entries for display.
620#[cfg_attr(coverage, allow(dead_code))]
621fn crontab_list(crontab: &str) -> String {
622    let entries: Vec<&str> = crontab
623        .lines()
624        .filter(|l| l.contains(CRONTAB_TAG))
625        .collect();
626    if entries.is_empty() {
627        return "No pulpo schedules.".into();
628    }
629    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
630    for entry in entries {
631        let paused = entry.starts_with('#');
632        let raw = entry.trim_start_matches('#').trim();
633        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
634        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
635        let cron_expr = if parts.len() >= 5 {
636            parts[..5].join(" ")
637        } else {
638            "?".into()
639        };
640        lines.push(format!(
641            "{:<20} {:<15} {}",
642            name,
643            cron_expr,
644            if paused { "yes" } else { "no" }
645        ));
646    }
647    lines.join("\n")
648}
649
650/// Remove a schedule from a crontab string. Returns the updated crontab.
651#[cfg_attr(coverage, allow(dead_code))]
652fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
653    use std::fmt::Write;
654    let tag = format!("{CRONTAB_TAG}{name}");
655    let filtered =
656        crontab
657            .lines()
658            .filter(|l| !l.contains(&tag))
659            .fold(String::new(), |mut acc, l| {
660                writeln!(acc, "{l}").unwrap();
661                acc
662            });
663    if filtered.len() == crontab.len() {
664        anyhow::bail!("schedule \"{name}\" not found");
665    }
666    Ok(filtered)
667}
668
669/// Pause (comment out) a schedule in a crontab string.
670#[cfg_attr(coverage, allow(dead_code))]
671fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
672    use std::fmt::Write;
673    let tag = format!("{CRONTAB_TAG}{name}");
674    let mut found = false;
675    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
676        if l.contains(&tag) && !l.starts_with('#') {
677            found = true;
678            writeln!(acc, "#{l}").unwrap();
679        } else {
680            writeln!(acc, "{l}").unwrap();
681        }
682        acc
683    });
684    if !found {
685        anyhow::bail!("schedule \"{name}\" not found or already paused");
686    }
687    Ok(updated)
688}
689
690/// Resume (uncomment) a schedule in a crontab string.
691#[cfg_attr(coverage, allow(dead_code))]
692fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
693    use std::fmt::Write;
694    let tag = format!("{CRONTAB_TAG}{name}");
695    let mut found = false;
696    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
697        if l.contains(&tag) && l.starts_with('#') {
698            found = true;
699            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
700        } else {
701            writeln!(acc, "{l}").unwrap();
702        }
703        acc
704    });
705    if !found {
706        anyhow::bail!("schedule \"{name}\" not found or not paused");
707    }
708    Ok(updated)
709}
710
711/// Execute a schedule subcommand using the crontab wrapper.
712#[cfg(not(coverage))]
713fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
714    match action {
715        ScheduleAction::Install {
716            name,
717            cron,
718            workdir,
719            provider,
720            prompt,
721        } => {
722            let crontab = read_crontab()?;
723            let joined_prompt = prompt.join(" ");
724            let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
725            let updated = crontab_install(&crontab, name, &line)?;
726            write_crontab(&updated)?;
727            Ok(format!("Installed schedule \"{name}\""))
728        }
729        ScheduleAction::List => {
730            let crontab = read_crontab()?;
731            Ok(crontab_list(&crontab))
732        }
733        ScheduleAction::Remove { name } => {
734            let crontab = read_crontab()?;
735            let updated = crontab_remove(&crontab, name)?;
736            write_crontab(&updated)?;
737            Ok(format!("Removed schedule \"{name}\""))
738        }
739        ScheduleAction::Pause { name } => {
740            let crontab = read_crontab()?;
741            let updated = crontab_pause(&crontab, name)?;
742            write_crontab(&updated)?;
743            Ok(format!("Paused schedule \"{name}\""))
744        }
745        ScheduleAction::Resume { name } => {
746            let crontab = read_crontab()?;
747            let updated = crontab_resume(&crontab, name)?;
748            write_crontab(&updated)?;
749            Ok(format!("Resumed schedule \"{name}\""))
750        }
751    }
752}
753
754/// Stub for coverage builds — crontab is real I/O.
755#[cfg(coverage)]
756fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
757    Ok(String::new())
758}
759
760/// Execute the given CLI command against the specified node.
761#[allow(clippy::too_many_lines)]
762pub async fn execute(cli: &Cli) -> Result<String> {
763    let url = base_url(&cli.node);
764    let client = reqwest::Client::new();
765    let node = &cli.node;
766    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
767
768    match &cli.command {
769        Commands::Attach { name } => {
770            // Fetch session to get actual backend_session_id
771            let resp = authed_get(
772                &client,
773                format!("{url}/api/v1/sessions/{name}"),
774                token.as_deref(),
775            )
776            .send()
777            .await
778            .map_err(|e| friendly_error(&e, node))?;
779            let text = ok_or_api_error(resp).await?;
780            let session: Session = serde_json::from_str(&text)?;
781            let backend_id = session
782                .backend_session_id
783                .unwrap_or_else(|| format!("pulpo-{name}"));
784            attach_session(&backend_id)?;
785            Ok(format!("Detached from session {name}."))
786        }
787        Commands::Input { name, text } => {
788            let input_text = text.as_deref().unwrap_or("\n");
789            let body = serde_json::json!({ "text": input_text });
790            let resp = authed_post(
791                &client,
792                format!("{url}/api/v1/sessions/{name}/input"),
793                token.as_deref(),
794            )
795            .json(&body)
796            .send()
797            .await
798            .map_err(|e| friendly_error(&e, node))?;
799            ok_or_api_error(resp).await?;
800            Ok(format!("Sent input to session {name}."))
801        }
802        Commands::List => {
803            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
804                .send()
805                .await
806                .map_err(|e| friendly_error(&e, node))?;
807            let text = ok_or_api_error(resp).await?;
808            let sessions: Vec<Session> = serde_json::from_str(&text)?;
809            Ok(format_sessions(&sessions))
810        }
811        Commands::Nodes => {
812            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
813                .send()
814                .await
815                .map_err(|e| friendly_error(&e, node))?;
816            let text = ok_or_api_error(resp).await?;
817            let resp: PeersResponse = serde_json::from_str(&text)?;
818            Ok(format_nodes(&resp))
819        }
820        Commands::Spawn {
821            workdir,
822            name,
823            provider,
824            auto,
825            guard,
826            model,
827            system_prompt,
828            allowed_tools,
829            ink,
830            max_turns,
831            max_budget,
832            output_format,
833            prompt,
834        } => {
835            let prompt_text = prompt.join(" ");
836            let mode = if *auto { "autonomous" } else { "interactive" };
837            let mut body = serde_json::json!({
838                "workdir": workdir,
839                "provider": provider,
840                "prompt": prompt_text,
841                "mode": mode,
842                "guard_preset": guard,
843            });
844            if let Some(n) = name {
845                body["name"] = serde_json::json!(n);
846            }
847            if let Some(m) = model {
848                body["model"] = serde_json::json!(m);
849            }
850            if let Some(sp) = system_prompt {
851                body["system_prompt"] = serde_json::json!(sp);
852            }
853            if let Some(tools) = allowed_tools {
854                body["allowed_tools"] = serde_json::json!(tools);
855            }
856            if let Some(p) = ink {
857                body["ink"] = serde_json::json!(p);
858            }
859            if let Some(mt) = max_turns {
860                body["max_turns"] = serde_json::json!(mt);
861            }
862            if let Some(mb) = max_budget {
863                body["max_budget_usd"] = serde_json::json!(mb);
864            }
865            if let Some(of) = output_format {
866                body["output_format"] = serde_json::json!(of);
867            }
868            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
869                .json(&body)
870                .send()
871                .await
872                .map_err(|e| friendly_error(&e, node))?;
873            let text = ok_or_api_error(resp).await?;
874            let session: Session = serde_json::from_str(&text)?;
875            Ok(format!(
876                "Created session \"{}\" ({})",
877                session.name, session.id
878            ))
879        }
880        Commands::Kill { name } => {
881            let resp = authed_post(
882                &client,
883                format!("{url}/api/v1/sessions/{name}/kill"),
884                token.as_deref(),
885            )
886            .send()
887            .await
888            .map_err(|e| friendly_error(&e, node))?;
889            ok_or_api_error(resp).await?;
890            Ok(format!("Session {name} killed."))
891        }
892        Commands::Delete { name } => {
893            let resp = authed_delete(
894                &client,
895                format!("{url}/api/v1/sessions/{name}"),
896                token.as_deref(),
897            )
898            .send()
899            .await
900            .map_err(|e| friendly_error(&e, node))?;
901            ok_or_api_error(resp).await?;
902            Ok(format!("Session {name} deleted."))
903        }
904        Commands::Logs {
905            name,
906            lines,
907            follow,
908        } => {
909            if *follow {
910                let mut stdout = std::io::stdout();
911                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
912                    .await
913                    .map_err(|e| {
914                        // Unwrap reqwest errors to friendly messages
915                        match e.downcast::<reqwest::Error>() {
916                            Ok(re) => friendly_error(&re, node),
917                            Err(other) => other,
918                        }
919                    })?;
920                Ok(String::new())
921            } else {
922                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
923                    .await
924                    .map_err(|e| match e.downcast::<reqwest::Error>() {
925                        Ok(re) => friendly_error(&re, node),
926                        Err(other) => other,
927                    })?;
928                Ok(output)
929            }
930        }
931        Commands::Interventions { name } => {
932            let resp = authed_get(
933                &client,
934                format!("{url}/api/v1/sessions/{name}/interventions"),
935                token.as_deref(),
936            )
937            .send()
938            .await
939            .map_err(|e| friendly_error(&e, node))?;
940            let text = ok_or_api_error(resp).await?;
941            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
942            Ok(format_interventions(&events))
943        }
944        Commands::Ui => {
945            let dashboard = base_url(&cli.node);
946            open_browser(&dashboard)?;
947            Ok(format!("Opening {dashboard}"))
948        }
949        Commands::Resume { name } => {
950            let resp = authed_post(
951                &client,
952                format!("{url}/api/v1/sessions/{name}/resume"),
953                token.as_deref(),
954            )
955            .send()
956            .await
957            .map_err(|e| friendly_error(&e, node))?;
958            let text = ok_or_api_error(resp).await?;
959            let session: Session = serde_json::from_str(&text)?;
960            Ok(format!("Resumed session \"{}\"", session.name))
961        }
962        Commands::Schedule { action } => execute_schedule(action, node),
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::*;
969
970    #[test]
971    fn test_base_url() {
972        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
973        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
974    }
975
976    #[test]
977    fn test_cli_parse_list() {
978        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
979        assert_eq!(cli.node, "localhost:7433");
980        assert!(matches!(cli.command, Commands::List));
981    }
982
983    #[test]
984    fn test_cli_parse_nodes() {
985        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
986        assert!(matches!(cli.command, Commands::Nodes));
987    }
988
989    #[test]
990    fn test_cli_parse_ui() {
991        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
992        assert!(matches!(cli.command, Commands::Ui));
993    }
994
995    #[test]
996    fn test_cli_parse_ui_custom_node() {
997        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
998        assert!(matches!(cli.command, Commands::Ui));
999        assert_eq!(cli.node, "mac-mini:7433");
1000    }
1001
1002    #[test]
1003    fn test_build_open_command() {
1004        let cmd = build_open_command("http://localhost:7433");
1005        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1006        assert_eq!(args, vec!["http://localhost:7433"]);
1007        #[cfg(target_os = "macos")]
1008        assert_eq!(cmd.get_program(), "open");
1009        #[cfg(target_os = "linux")]
1010        assert_eq!(cmd.get_program(), "xdg-open");
1011    }
1012
1013    #[test]
1014    fn test_cli_parse_spawn() {
1015        let cli = Cli::try_parse_from([
1016            "pulpo",
1017            "spawn",
1018            "--workdir",
1019            "/tmp/repo",
1020            "Fix",
1021            "the",
1022            "bug",
1023        ])
1024        .unwrap();
1025        assert!(matches!(
1026            &cli.command,
1027            Commands::Spawn { workdir, provider, auto, guard, prompt, .. }
1028                if workdir == "/tmp/repo" && provider == "claude" && !auto
1029                && guard == "standard" && prompt == &["Fix", "the", "bug"]
1030        ));
1031    }
1032
1033    #[test]
1034    fn test_cli_parse_spawn_with_provider() {
1035        let cli = Cli::try_parse_from([
1036            "pulpo",
1037            "spawn",
1038            "--workdir",
1039            "/tmp",
1040            "--provider",
1041            "codex",
1042            "Do it",
1043        ])
1044        .unwrap();
1045        assert!(matches!(
1046            &cli.command,
1047            Commands::Spawn { provider, .. } if provider == "codex"
1048        ));
1049    }
1050
1051    #[test]
1052    fn test_cli_parse_spawn_auto() {
1053        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1054            .unwrap();
1055        assert!(matches!(
1056            &cli.command,
1057            Commands::Spawn { auto, .. } if *auto
1058        ));
1059    }
1060
1061    #[test]
1062    fn test_cli_parse_spawn_with_guard() {
1063        let cli = Cli::try_parse_from([
1064            "pulpo",
1065            "spawn",
1066            "--workdir",
1067            "/tmp",
1068            "--guard",
1069            "strict",
1070            "Do it",
1071        ])
1072        .unwrap();
1073        assert!(matches!(
1074            &cli.command,
1075            Commands::Spawn { guard, .. } if guard == "strict"
1076        ));
1077    }
1078
1079    #[test]
1080    fn test_cli_parse_spawn_guard_default() {
1081        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1082        assert!(matches!(
1083            &cli.command,
1084            Commands::Spawn { guard, .. } if guard == "standard"
1085        ));
1086    }
1087
1088    #[test]
1089    fn test_cli_parse_spawn_with_name() {
1090        let cli = Cli::try_parse_from([
1091            "pulpo",
1092            "spawn",
1093            "--workdir",
1094            "/tmp/repo",
1095            "--name",
1096            "my-task",
1097            "Fix it",
1098        ])
1099        .unwrap();
1100        assert!(matches!(
1101            &cli.command,
1102            Commands::Spawn { workdir, name, .. }
1103                if workdir == "/tmp/repo" && name.as_deref() == Some("my-task")
1104        ));
1105    }
1106
1107    #[test]
1108    fn test_cli_parse_spawn_without_name() {
1109        let cli =
1110            Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1111        assert!(matches!(
1112            &cli.command,
1113            Commands::Spawn { name, .. } if name.is_none()
1114        ));
1115    }
1116
1117    #[test]
1118    fn test_cli_parse_logs() {
1119        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1120        assert!(matches!(
1121            &cli.command,
1122            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1123        ));
1124    }
1125
1126    #[test]
1127    fn test_cli_parse_logs_with_lines() {
1128        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1129        assert!(matches!(
1130            &cli.command,
1131            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1132        ));
1133    }
1134
1135    #[test]
1136    fn test_cli_parse_logs_follow() {
1137        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1138        assert!(matches!(
1139            &cli.command,
1140            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1141        ));
1142    }
1143
1144    #[test]
1145    fn test_cli_parse_logs_follow_short() {
1146        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1147        assert!(matches!(
1148            &cli.command,
1149            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1150        ));
1151    }
1152
1153    #[test]
1154    fn test_cli_parse_kill() {
1155        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1156        assert!(matches!(
1157            &cli.command,
1158            Commands::Kill { name } if name == "my-session"
1159        ));
1160    }
1161
1162    #[test]
1163    fn test_cli_parse_delete() {
1164        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1165        assert!(matches!(
1166            &cli.command,
1167            Commands::Delete { name } if name == "my-session"
1168        ));
1169    }
1170
1171    #[test]
1172    fn test_cli_parse_resume() {
1173        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1174        assert!(matches!(
1175            &cli.command,
1176            Commands::Resume { name } if name == "my-session"
1177        ));
1178    }
1179
1180    #[test]
1181    fn test_cli_parse_input() {
1182        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1183        assert!(matches!(
1184            &cli.command,
1185            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1186        ));
1187    }
1188
1189    #[test]
1190    fn test_cli_parse_input_no_text() {
1191        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1192        assert!(matches!(
1193            &cli.command,
1194            Commands::Input { name, text } if name == "my-session" && text.is_none()
1195        ));
1196    }
1197
1198    #[test]
1199    fn test_cli_parse_input_alias() {
1200        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1201        assert!(matches!(
1202            &cli.command,
1203            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1204        ));
1205    }
1206
1207    #[test]
1208    fn test_cli_parse_custom_node() {
1209        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1210        assert_eq!(cli.node, "win-pc:8080");
1211    }
1212
1213    #[test]
1214    fn test_cli_version() {
1215        let result = Cli::try_parse_from(["pulpo", "--version"]);
1216        // clap exits with an error (kind DisplayVersion) when --version is used
1217        let err = result.unwrap_err();
1218        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1219    }
1220
1221    #[test]
1222    fn test_cli_parse_no_subcommand_fails() {
1223        let result = Cli::try_parse_from(["pulpo"]);
1224        assert!(result.is_err());
1225    }
1226
1227    #[test]
1228    fn test_cli_debug() {
1229        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1230        let debug = format!("{cli:?}");
1231        assert!(debug.contains("List"));
1232    }
1233
1234    #[test]
1235    fn test_commands_debug() {
1236        let cmd = Commands::List;
1237        assert_eq!(format!("{cmd:?}"), "List");
1238    }
1239
1240    /// A valid Session JSON for test responses.
1241    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1242
1243    /// Start a lightweight test HTTP server and return its address.
1244    async fn start_test_server() -> String {
1245        use axum::http::StatusCode;
1246        use axum::{
1247            Json, Router,
1248            routing::{get, post},
1249        };
1250
1251        let session_json = TEST_SESSION_JSON;
1252
1253        let app = Router::new()
1254            .route(
1255                "/api/v1/sessions",
1256                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1257                    (StatusCode::CREATED, session_json.to_owned())
1258                }),
1259            )
1260            .route(
1261                "/api/v1/sessions/{id}",
1262                get(move || async move { session_json.to_owned() })
1263                    .delete(|| async { StatusCode::NO_CONTENT }),
1264            )
1265            .route(
1266                "/api/v1/sessions/{id}/kill",
1267                post(|| async { StatusCode::NO_CONTENT }),
1268            )
1269            .route(
1270                "/api/v1/sessions/{id}/output",
1271                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1272            )
1273            .route(
1274                "/api/v1/peers",
1275                get(|| async {
1276                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1277                }),
1278            )
1279            .route(
1280                "/api/v1/sessions/{id}/resume",
1281                axum::routing::post(move || async move { session_json.to_owned() }),
1282            )
1283            .route(
1284                "/api/v1/sessions/{id}/interventions",
1285                get(|| async { "[]".to_owned() }),
1286            )
1287            .route(
1288                "/api/v1/sessions/{id}/input",
1289                post(|| async { StatusCode::NO_CONTENT }),
1290            );
1291
1292        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1293        let addr = listener.local_addr().unwrap();
1294        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1295        format!("127.0.0.1:{}", addr.port())
1296    }
1297
1298    #[tokio::test]
1299    async fn test_execute_list_success() {
1300        let node = start_test_server().await;
1301        let cli = Cli {
1302            node,
1303            token: None,
1304            command: Commands::List,
1305        };
1306        let result = execute(&cli).await.unwrap();
1307        assert_eq!(result, "No sessions.");
1308    }
1309
1310    #[tokio::test]
1311    async fn test_execute_nodes_success() {
1312        let node = start_test_server().await;
1313        let cli = Cli {
1314            node,
1315            token: None,
1316            command: Commands::Nodes,
1317        };
1318        let result = execute(&cli).await.unwrap();
1319        assert!(result.contains("test"));
1320        assert!(result.contains("(local)"));
1321        assert!(result.contains("NAME"));
1322    }
1323
1324    #[tokio::test]
1325    async fn test_execute_spawn_success() {
1326        let node = start_test_server().await;
1327        let cli = Cli {
1328            node,
1329            token: None,
1330            command: Commands::Spawn {
1331                workdir: "/tmp/repo".into(),
1332                name: None,
1333                provider: "claude".into(),
1334                auto: false,
1335                guard: "standard".into(),
1336                model: None,
1337                system_prompt: None,
1338                allowed_tools: None,
1339                ink: None,
1340                max_turns: None,
1341                max_budget: None,
1342                output_format: None,
1343                prompt: vec!["Fix".into(), "bug".into()],
1344            },
1345        };
1346        let result = execute(&cli).await.unwrap();
1347        assert!(result.contains("Created session"));
1348        assert!(result.contains("repo"));
1349    }
1350
1351    #[tokio::test]
1352    async fn test_execute_spawn_with_all_new_flags() {
1353        let node = start_test_server().await;
1354        let cli = Cli {
1355            node,
1356            token: None,
1357            command: Commands::Spawn {
1358                workdir: "/tmp/repo".into(),
1359                name: None,
1360                provider: "claude".into(),
1361                auto: false,
1362                guard: "standard".into(),
1363                model: Some("opus".into()),
1364                system_prompt: Some("Be helpful".into()),
1365                allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1366                ink: Some("coder".into()),
1367                max_turns: Some(5),
1368                max_budget: Some(2.5),
1369                output_format: Some("json".into()),
1370                prompt: vec!["Fix".into(), "bug".into()],
1371            },
1372        };
1373        let result = execute(&cli).await.unwrap();
1374        assert!(result.contains("Created session"));
1375    }
1376
1377    #[tokio::test]
1378    async fn test_execute_spawn_auto_mode() {
1379        let node = start_test_server().await;
1380        let cli = Cli {
1381            node,
1382            token: None,
1383            command: Commands::Spawn {
1384                workdir: "/tmp/repo".into(),
1385                name: None,
1386                provider: "claude".into(),
1387                auto: true,
1388                guard: "standard".into(),
1389                model: None,
1390                system_prompt: None,
1391                allowed_tools: None,
1392                ink: None,
1393                max_turns: None,
1394                max_budget: None,
1395                output_format: None,
1396                prompt: vec!["Do it".into()],
1397            },
1398        };
1399        let result = execute(&cli).await.unwrap();
1400        assert!(result.contains("Created session"));
1401    }
1402
1403    #[tokio::test]
1404    async fn test_execute_spawn_with_name() {
1405        let node = start_test_server().await;
1406        let cli = Cli {
1407            node,
1408            token: None,
1409            command: Commands::Spawn {
1410                workdir: "/tmp/repo".into(),
1411                name: Some("my-task".into()),
1412                provider: "claude".into(),
1413                auto: false,
1414                guard: "standard".into(),
1415                model: None,
1416                system_prompt: None,
1417                allowed_tools: None,
1418                ink: None,
1419                max_turns: None,
1420                max_budget: None,
1421                output_format: None,
1422                prompt: vec!["Fix".into(), "bug".into()],
1423            },
1424        };
1425        let result = execute(&cli).await.unwrap();
1426        assert!(result.contains("Created session"));
1427    }
1428
1429    #[tokio::test]
1430    async fn test_execute_kill_success() {
1431        let node = start_test_server().await;
1432        let cli = Cli {
1433            node,
1434            token: None,
1435            command: Commands::Kill {
1436                name: "test-session".into(),
1437            },
1438        };
1439        let result = execute(&cli).await.unwrap();
1440        assert!(result.contains("killed"));
1441    }
1442
1443    #[tokio::test]
1444    async fn test_execute_delete_success() {
1445        let node = start_test_server().await;
1446        let cli = Cli {
1447            node,
1448            token: None,
1449            command: Commands::Delete {
1450                name: "test-session".into(),
1451            },
1452        };
1453        let result = execute(&cli).await.unwrap();
1454        assert!(result.contains("deleted"));
1455    }
1456
1457    #[tokio::test]
1458    async fn test_execute_logs_success() {
1459        let node = start_test_server().await;
1460        let cli = Cli {
1461            node,
1462            token: None,
1463            command: Commands::Logs {
1464                name: "test-session".into(),
1465                lines: 50,
1466                follow: false,
1467            },
1468        };
1469        let result = execute(&cli).await.unwrap();
1470        assert!(result.contains("test output"));
1471    }
1472
1473    #[tokio::test]
1474    async fn test_execute_list_connection_refused() {
1475        let cli = Cli {
1476            node: "localhost:1".into(),
1477            token: None,
1478            command: Commands::List,
1479        };
1480        let result = execute(&cli).await;
1481        let err = result.unwrap_err().to_string();
1482        assert!(
1483            err.contains("Could not connect to pulpod"),
1484            "Expected friendly error, got: {err}"
1485        );
1486        assert!(err.contains("localhost:1"));
1487    }
1488
1489    #[tokio::test]
1490    async fn test_execute_nodes_connection_refused() {
1491        let cli = Cli {
1492            node: "localhost:1".into(),
1493            token: None,
1494            command: Commands::Nodes,
1495        };
1496        let result = execute(&cli).await;
1497        let err = result.unwrap_err().to_string();
1498        assert!(err.contains("Could not connect to pulpod"));
1499    }
1500
1501    #[tokio::test]
1502    async fn test_execute_kill_error_response() {
1503        use axum::{Router, http::StatusCode, routing::post};
1504
1505        let app = Router::new().route(
1506            "/api/v1/sessions/{id}/kill",
1507            post(|| async {
1508                (
1509                    StatusCode::NOT_FOUND,
1510                    "{\"error\":\"session not found: test-session\"}",
1511                )
1512            }),
1513        );
1514        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1515        let addr = listener.local_addr().unwrap();
1516        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1517        let node = format!("127.0.0.1:{}", addr.port());
1518
1519        let cli = Cli {
1520            node,
1521            token: None,
1522            command: Commands::Kill {
1523                name: "test-session".into(),
1524            },
1525        };
1526        let err = execute(&cli).await.unwrap_err();
1527        assert_eq!(err.to_string(), "session not found: test-session");
1528    }
1529
1530    #[tokio::test]
1531    async fn test_execute_delete_error_response() {
1532        use axum::{Router, http::StatusCode, routing::delete};
1533
1534        let app = Router::new().route(
1535            "/api/v1/sessions/{id}",
1536            delete(|| async {
1537                (
1538                    StatusCode::CONFLICT,
1539                    "{\"error\":\"cannot delete session in 'running' state\"}",
1540                )
1541            }),
1542        );
1543        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1544        let addr = listener.local_addr().unwrap();
1545        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1546        let node = format!("127.0.0.1:{}", addr.port());
1547
1548        let cli = Cli {
1549            node,
1550            token: None,
1551            command: Commands::Delete {
1552                name: "test-session".into(),
1553            },
1554        };
1555        let err = execute(&cli).await.unwrap_err();
1556        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1557    }
1558
1559    #[tokio::test]
1560    async fn test_execute_logs_error_response() {
1561        use axum::{Router, http::StatusCode, routing::get};
1562
1563        let app = Router::new().route(
1564            "/api/v1/sessions/{id}/output",
1565            get(|| async {
1566                (
1567                    StatusCode::NOT_FOUND,
1568                    "{\"error\":\"session not found: ghost\"}",
1569                )
1570            }),
1571        );
1572        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1573        let addr = listener.local_addr().unwrap();
1574        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1575        let node = format!("127.0.0.1:{}", addr.port());
1576
1577        let cli = Cli {
1578            node,
1579            token: None,
1580            command: Commands::Logs {
1581                name: "ghost".into(),
1582                lines: 50,
1583                follow: false,
1584            },
1585        };
1586        let err = execute(&cli).await.unwrap_err();
1587        assert_eq!(err.to_string(), "session not found: ghost");
1588    }
1589
1590    #[tokio::test]
1591    async fn test_execute_resume_error_response() {
1592        use axum::{Router, http::StatusCode, routing::post};
1593
1594        let app = Router::new().route(
1595            "/api/v1/sessions/{id}/resume",
1596            post(|| async {
1597                (
1598                    StatusCode::BAD_REQUEST,
1599                    "{\"error\":\"session is not stale (status: running)\"}",
1600                )
1601            }),
1602        );
1603        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1604        let addr = listener.local_addr().unwrap();
1605        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1606        let node = format!("127.0.0.1:{}", addr.port());
1607
1608        let cli = Cli {
1609            node,
1610            token: None,
1611            command: Commands::Resume {
1612                name: "test-session".into(),
1613            },
1614        };
1615        let err = execute(&cli).await.unwrap_err();
1616        assert_eq!(err.to_string(), "session is not stale (status: running)");
1617    }
1618
1619    #[tokio::test]
1620    async fn test_execute_spawn_error_response() {
1621        use axum::{Router, http::StatusCode, routing::post};
1622
1623        let app = Router::new().route(
1624            "/api/v1/sessions",
1625            post(|| async {
1626                (
1627                    StatusCode::INTERNAL_SERVER_ERROR,
1628                    "{\"error\":\"failed to spawn session\"}",
1629                )
1630            }),
1631        );
1632        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1633        let addr = listener.local_addr().unwrap();
1634        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1635        let node = format!("127.0.0.1:{}", addr.port());
1636
1637        let cli = Cli {
1638            node,
1639            token: None,
1640            command: Commands::Spawn {
1641                workdir: "/tmp/repo".into(),
1642                name: None,
1643                provider: "claude".into(),
1644                auto: false,
1645                guard: "standard".into(),
1646                model: None,
1647                system_prompt: None,
1648                allowed_tools: None,
1649                ink: None,
1650                max_turns: None,
1651                max_budget: None,
1652                output_format: None,
1653                prompt: vec!["test".into()],
1654            },
1655        };
1656        let err = execute(&cli).await.unwrap_err();
1657        assert_eq!(err.to_string(), "failed to spawn session");
1658    }
1659
1660    #[tokio::test]
1661    async fn test_execute_interventions_error_response() {
1662        use axum::{Router, http::StatusCode, routing::get};
1663
1664        let app = Router::new().route(
1665            "/api/v1/sessions/{id}/interventions",
1666            get(|| async {
1667                (
1668                    StatusCode::NOT_FOUND,
1669                    "{\"error\":\"session not found: ghost\"}",
1670                )
1671            }),
1672        );
1673        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1674        let addr = listener.local_addr().unwrap();
1675        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1676        let node = format!("127.0.0.1:{}", addr.port());
1677
1678        let cli = Cli {
1679            node,
1680            token: None,
1681            command: Commands::Interventions {
1682                name: "ghost".into(),
1683            },
1684        };
1685        let err = execute(&cli).await.unwrap_err();
1686        assert_eq!(err.to_string(), "session not found: ghost");
1687    }
1688
1689    #[tokio::test]
1690    async fn test_execute_resume_success() {
1691        let node = start_test_server().await;
1692        let cli = Cli {
1693            node,
1694            token: None,
1695            command: Commands::Resume {
1696                name: "test-session".into(),
1697            },
1698        };
1699        let result = execute(&cli).await.unwrap();
1700        assert!(result.contains("Resumed session"));
1701        assert!(result.contains("repo"));
1702    }
1703
1704    #[tokio::test]
1705    async fn test_execute_input_success() {
1706        let node = start_test_server().await;
1707        let cli = Cli {
1708            node,
1709            token: None,
1710            command: Commands::Input {
1711                name: "test-session".into(),
1712                text: Some("yes".into()),
1713            },
1714        };
1715        let result = execute(&cli).await.unwrap();
1716        assert!(result.contains("Sent input to session test-session"));
1717    }
1718
1719    #[tokio::test]
1720    async fn test_execute_input_no_text() {
1721        let node = start_test_server().await;
1722        let cli = Cli {
1723            node,
1724            token: None,
1725            command: Commands::Input {
1726                name: "test-session".into(),
1727                text: None,
1728            },
1729        };
1730        let result = execute(&cli).await.unwrap();
1731        assert!(result.contains("Sent input to session test-session"));
1732    }
1733
1734    #[tokio::test]
1735    async fn test_execute_input_connection_refused() {
1736        let cli = Cli {
1737            node: "localhost:1".into(),
1738            token: None,
1739            command: Commands::Input {
1740                name: "test".into(),
1741                text: Some("y".into()),
1742            },
1743        };
1744        let result = execute(&cli).await;
1745        let err = result.unwrap_err().to_string();
1746        assert!(err.contains("Could not connect to pulpod"));
1747    }
1748
1749    #[tokio::test]
1750    async fn test_execute_input_error_response() {
1751        use axum::{Router, http::StatusCode, routing::post};
1752
1753        let app = Router::new().route(
1754            "/api/v1/sessions/{id}/input",
1755            post(|| async {
1756                (
1757                    StatusCode::NOT_FOUND,
1758                    "{\"error\":\"session not found: ghost\"}",
1759                )
1760            }),
1761        );
1762        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1763        let addr = listener.local_addr().unwrap();
1764        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1765        let node = format!("127.0.0.1:{}", addr.port());
1766
1767        let cli = Cli {
1768            node,
1769            token: None,
1770            command: Commands::Input {
1771                name: "ghost".into(),
1772                text: Some("y".into()),
1773            },
1774        };
1775        let err = execute(&cli).await.unwrap_err();
1776        assert_eq!(err.to_string(), "session not found: ghost");
1777    }
1778
1779    #[tokio::test]
1780    async fn test_execute_ui() {
1781        let cli = Cli {
1782            node: "localhost:7433".into(),
1783            token: None,
1784            command: Commands::Ui,
1785        };
1786        let result = execute(&cli).await.unwrap();
1787        assert!(result.contains("Opening"));
1788        assert!(result.contains("http://localhost:7433"));
1789    }
1790
1791    #[tokio::test]
1792    async fn test_execute_ui_custom_node() {
1793        let cli = Cli {
1794            node: "mac-mini:7433".into(),
1795            token: None,
1796            command: Commands::Ui,
1797        };
1798        let result = execute(&cli).await.unwrap();
1799        assert!(result.contains("http://mac-mini:7433"));
1800    }
1801
1802    #[test]
1803    fn test_format_sessions_empty() {
1804        assert_eq!(format_sessions(&[]), "No sessions.");
1805    }
1806
1807    #[test]
1808    fn test_format_sessions_with_data() {
1809        use chrono::Utc;
1810        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1811        use uuid::Uuid;
1812
1813        let sessions = vec![Session {
1814            id: Uuid::nil(),
1815            name: "my-api".into(),
1816            workdir: "/tmp/repo".into(),
1817            provider: Provider::Claude,
1818            prompt: "Fix the bug".into(),
1819            status: SessionStatus::Running,
1820            mode: SessionMode::Interactive,
1821            conversation_id: None,
1822            exit_code: None,
1823            backend_session_id: None,
1824            output_snapshot: None,
1825            guard_config: None,
1826            model: None,
1827            allowed_tools: None,
1828            system_prompt: None,
1829            metadata: None,
1830            ink: None,
1831            max_turns: None,
1832            max_budget_usd: None,
1833            output_format: None,
1834            intervention_reason: None,
1835            intervention_at: None,
1836            last_output_at: None,
1837            idle_since: None,
1838            waiting_for_input: false,
1839            created_at: Utc::now(),
1840            updated_at: Utc::now(),
1841        }];
1842        let output = format_sessions(&sessions);
1843        assert!(output.contains("NAME"));
1844        assert!(output.contains("my-api"));
1845        assert!(output.contains("running"));
1846        assert!(output.contains("claude"));
1847        assert!(output.contains("Fix the bug"));
1848    }
1849
1850    #[test]
1851    fn test_format_sessions_long_prompt_truncated() {
1852        use chrono::Utc;
1853        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1854        use uuid::Uuid;
1855
1856        let sessions = vec![Session {
1857            id: Uuid::nil(),
1858            name: "test".into(),
1859            workdir: "/tmp".into(),
1860            provider: Provider::Codex,
1861            prompt: "A very long prompt that exceeds forty characters in total length".into(),
1862            status: SessionStatus::Completed,
1863            mode: SessionMode::Autonomous,
1864            conversation_id: None,
1865            exit_code: None,
1866            backend_session_id: None,
1867            output_snapshot: None,
1868            guard_config: None,
1869            model: None,
1870            allowed_tools: None,
1871            system_prompt: None,
1872            metadata: None,
1873            ink: None,
1874            max_turns: None,
1875            max_budget_usd: None,
1876            output_format: None,
1877            intervention_reason: None,
1878            intervention_at: None,
1879            last_output_at: None,
1880            idle_since: None,
1881            waiting_for_input: false,
1882            created_at: Utc::now(),
1883            updated_at: Utc::now(),
1884        }];
1885        let output = format_sessions(&sessions);
1886        assert!(output.contains("..."));
1887    }
1888
1889    #[test]
1890    fn test_format_sessions_waiting_for_input() {
1891        use chrono::Utc;
1892        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1893        use uuid::Uuid;
1894
1895        let sessions = vec![Session {
1896            id: Uuid::nil(),
1897            name: "blocked".into(),
1898            workdir: "/tmp".into(),
1899            provider: Provider::Claude,
1900            prompt: "Fix bug".into(),
1901            status: SessionStatus::Running,
1902            mode: SessionMode::Interactive,
1903            conversation_id: None,
1904            exit_code: None,
1905            backend_session_id: None,
1906            output_snapshot: None,
1907            guard_config: None,
1908            model: None,
1909            allowed_tools: None,
1910            system_prompt: None,
1911            metadata: None,
1912            ink: None,
1913            max_turns: None,
1914            max_budget_usd: None,
1915            output_format: None,
1916            intervention_reason: None,
1917            intervention_at: None,
1918            last_output_at: None,
1919            idle_since: None,
1920            waiting_for_input: true,
1921            created_at: Utc::now(),
1922            updated_at: Utc::now(),
1923        }];
1924        let output = format_sessions(&sessions);
1925        assert!(output.contains("waiting"));
1926        assert!(!output.contains("running"));
1927    }
1928
1929    #[test]
1930    fn test_format_nodes() {
1931        use pulpo_common::node::NodeInfo;
1932        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1933
1934        let resp = PeersResponse {
1935            local: NodeInfo {
1936                name: "mac-mini".into(),
1937                hostname: "h".into(),
1938                os: "macos".into(),
1939                arch: "arm64".into(),
1940                cpus: 8,
1941                memory_mb: 16384,
1942                gpu: None,
1943            },
1944            peers: vec![PeerInfo {
1945                name: "win-pc".into(),
1946                address: "win-pc:7433".into(),
1947                status: PeerStatus::Online,
1948                node_info: None,
1949                session_count: Some(3),
1950                source: PeerSource::Configured,
1951            }],
1952        };
1953        let output = format_nodes(&resp);
1954        assert!(output.contains("mac-mini"));
1955        assert!(output.contains("(local)"));
1956        assert!(output.contains("win-pc"));
1957        assert!(output.contains('3'));
1958    }
1959
1960    #[test]
1961    fn test_format_nodes_no_session_count() {
1962        use pulpo_common::node::NodeInfo;
1963        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1964
1965        let resp = PeersResponse {
1966            local: NodeInfo {
1967                name: "local".into(),
1968                hostname: "h".into(),
1969                os: "linux".into(),
1970                arch: "x86_64".into(),
1971                cpus: 4,
1972                memory_mb: 8192,
1973                gpu: None,
1974            },
1975            peers: vec![PeerInfo {
1976                name: "peer".into(),
1977                address: "peer:7433".into(),
1978                status: PeerStatus::Offline,
1979                node_info: None,
1980                session_count: None,
1981                source: PeerSource::Configured,
1982            }],
1983        };
1984        let output = format_nodes(&resp);
1985        assert!(output.contains("offline"));
1986        // No session count → shows "-"
1987        let lines: Vec<&str> = output.lines().collect();
1988        assert!(lines[2].contains('-'));
1989    }
1990
1991    #[tokio::test]
1992    async fn test_execute_resume_connection_refused() {
1993        let cli = Cli {
1994            node: "localhost:1".into(),
1995            token: None,
1996            command: Commands::Resume {
1997                name: "test".into(),
1998            },
1999        };
2000        let result = execute(&cli).await;
2001        let err = result.unwrap_err().to_string();
2002        assert!(err.contains("Could not connect to pulpod"));
2003    }
2004
2005    #[tokio::test]
2006    async fn test_execute_spawn_connection_refused() {
2007        let cli = Cli {
2008            node: "localhost:1".into(),
2009            token: None,
2010            command: Commands::Spawn {
2011                workdir: "/tmp".into(),
2012                name: None,
2013                provider: "claude".into(),
2014                auto: false,
2015                guard: "standard".into(),
2016                model: None,
2017                system_prompt: None,
2018                allowed_tools: None,
2019                ink: None,
2020                max_turns: None,
2021                max_budget: None,
2022                output_format: None,
2023                prompt: vec!["test".into()],
2024            },
2025        };
2026        let result = execute(&cli).await;
2027        let err = result.unwrap_err().to_string();
2028        assert!(err.contains("Could not connect to pulpod"));
2029    }
2030
2031    #[tokio::test]
2032    async fn test_execute_kill_connection_refused() {
2033        let cli = Cli {
2034            node: "localhost:1".into(),
2035            token: None,
2036            command: Commands::Kill {
2037                name: "test".into(),
2038            },
2039        };
2040        let result = execute(&cli).await;
2041        let err = result.unwrap_err().to_string();
2042        assert!(err.contains("Could not connect to pulpod"));
2043    }
2044
2045    #[tokio::test]
2046    async fn test_execute_delete_connection_refused() {
2047        let cli = Cli {
2048            node: "localhost:1".into(),
2049            token: None,
2050            command: Commands::Delete {
2051                name: "test".into(),
2052            },
2053        };
2054        let result = execute(&cli).await;
2055        let err = result.unwrap_err().to_string();
2056        assert!(err.contains("Could not connect to pulpod"));
2057    }
2058
2059    #[tokio::test]
2060    async fn test_execute_logs_connection_refused() {
2061        let cli = Cli {
2062            node: "localhost:1".into(),
2063            token: None,
2064            command: Commands::Logs {
2065                name: "test".into(),
2066                lines: 50,
2067                follow: false,
2068            },
2069        };
2070        let result = execute(&cli).await;
2071        let err = result.unwrap_err().to_string();
2072        assert!(err.contains("Could not connect to pulpod"));
2073    }
2074
2075    #[tokio::test]
2076    async fn test_friendly_error_connect() {
2077        // Make a request to a closed port to get a connect error
2078        let err = reqwest::Client::new()
2079            .get("http://127.0.0.1:1")
2080            .send()
2081            .await
2082            .unwrap_err();
2083        let friendly = friendly_error(&err, "test-node:1");
2084        let msg = friendly.to_string();
2085        assert!(
2086            msg.contains("Could not connect"),
2087            "Expected connect message, got: {msg}"
2088        );
2089    }
2090
2091    #[tokio::test]
2092    async fn test_friendly_error_other() {
2093        // A request to an invalid URL creates a builder error, not a connect error
2094        let err = reqwest::Client::new()
2095            .get("http://[::invalid::url")
2096            .send()
2097            .await
2098            .unwrap_err();
2099        let friendly = friendly_error(&err, "bad-host");
2100        let msg = friendly.to_string();
2101        assert!(
2102            msg.contains("Network error"),
2103            "Expected network error message, got: {msg}"
2104        );
2105        assert!(msg.contains("bad-host"));
2106    }
2107
2108    // -- Auth helper tests --
2109
2110    #[test]
2111    fn test_is_localhost_variants() {
2112        assert!(is_localhost("localhost:7433"));
2113        assert!(is_localhost("127.0.0.1:7433"));
2114        assert!(is_localhost("[::1]:7433"));
2115        assert!(is_localhost("::1"));
2116        assert!(is_localhost("localhost"));
2117        assert!(!is_localhost("mac-mini:7433"));
2118        assert!(!is_localhost("192.168.1.100:7433"));
2119    }
2120
2121    #[test]
2122    fn test_authed_get_with_token() {
2123        let client = reqwest::Client::new();
2124        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2125            .build()
2126            .unwrap();
2127        let auth = req
2128            .headers()
2129            .get("authorization")
2130            .unwrap()
2131            .to_str()
2132            .unwrap();
2133        assert_eq!(auth, "Bearer tok");
2134    }
2135
2136    #[test]
2137    fn test_authed_get_without_token() {
2138        let client = reqwest::Client::new();
2139        let req = authed_get(&client, "http://h:1/api".into(), None)
2140            .build()
2141            .unwrap();
2142        assert!(req.headers().get("authorization").is_none());
2143    }
2144
2145    #[test]
2146    fn test_authed_post_with_token() {
2147        let client = reqwest::Client::new();
2148        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2149            .build()
2150            .unwrap();
2151        let auth = req
2152            .headers()
2153            .get("authorization")
2154            .unwrap()
2155            .to_str()
2156            .unwrap();
2157        assert_eq!(auth, "Bearer secret");
2158    }
2159
2160    #[test]
2161    fn test_authed_post_without_token() {
2162        let client = reqwest::Client::new();
2163        let req = authed_post(&client, "http://h:1/api".into(), None)
2164            .build()
2165            .unwrap();
2166        assert!(req.headers().get("authorization").is_none());
2167    }
2168
2169    #[test]
2170    fn test_authed_delete_with_token() {
2171        let client = reqwest::Client::new();
2172        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2173            .build()
2174            .unwrap();
2175        let auth = req
2176            .headers()
2177            .get("authorization")
2178            .unwrap()
2179            .to_str()
2180            .unwrap();
2181        assert_eq!(auth, "Bearer del-tok");
2182    }
2183
2184    #[test]
2185    fn test_authed_delete_without_token() {
2186        let client = reqwest::Client::new();
2187        let req = authed_delete(&client, "http://h:1/api".into(), None)
2188            .build()
2189            .unwrap();
2190        assert!(req.headers().get("authorization").is_none());
2191    }
2192
2193    #[tokio::test]
2194    async fn test_resolve_token_explicit() {
2195        let client = reqwest::Client::new();
2196        let token =
2197            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2198        assert_eq!(token, Some("my-tok".into()));
2199    }
2200
2201    #[tokio::test]
2202    async fn test_resolve_token_remote_no_explicit() {
2203        let client = reqwest::Client::new();
2204        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2205        assert_eq!(token, None);
2206    }
2207
2208    #[tokio::test]
2209    async fn test_resolve_token_localhost_auto_discover() {
2210        use axum::{Json, Router, routing::get};
2211
2212        let app = Router::new().route(
2213            "/api/v1/auth/token",
2214            get(|| async {
2215                Json(AuthTokenResponse {
2216                    token: "discovered".into(),
2217                })
2218            }),
2219        );
2220        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2221        let addr = listener.local_addr().unwrap();
2222        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2223
2224        let node = format!("localhost:{}", addr.port());
2225        let base = base_url(&node);
2226        let client = reqwest::Client::new();
2227        let token = resolve_token(&client, &base, &node, None).await;
2228        assert_eq!(token, Some("discovered".into()));
2229    }
2230
2231    #[tokio::test]
2232    async fn test_discover_token_empty_returns_none() {
2233        use axum::{Json, Router, routing::get};
2234
2235        let app = Router::new().route(
2236            "/api/v1/auth/token",
2237            get(|| async {
2238                Json(AuthTokenResponse {
2239                    token: String::new(),
2240                })
2241            }),
2242        );
2243        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2244        let addr = listener.local_addr().unwrap();
2245        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2246
2247        let base = format!("http://127.0.0.1:{}", addr.port());
2248        let client = reqwest::Client::new();
2249        assert_eq!(discover_token(&client, &base).await, None);
2250    }
2251
2252    #[tokio::test]
2253    async fn test_discover_token_unreachable_returns_none() {
2254        let client = reqwest::Client::new();
2255        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2256    }
2257
2258    #[test]
2259    fn test_cli_parse_with_token() {
2260        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2261        assert_eq!(cli.token, Some("my-secret".into()));
2262    }
2263
2264    #[test]
2265    fn test_cli_parse_without_token() {
2266        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2267        assert_eq!(cli.token, None);
2268    }
2269
2270    #[tokio::test]
2271    async fn test_execute_with_explicit_token_sends_header() {
2272        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2273
2274        let app = Router::new().route(
2275            "/api/v1/sessions",
2276            get(|req: Request| async move {
2277                let auth = req
2278                    .headers()
2279                    .get("authorization")
2280                    .and_then(|v| v.to_str().ok())
2281                    .unwrap_or("");
2282                assert_eq!(auth, "Bearer test-token");
2283                (StatusCode::OK, "[]".to_owned())
2284            }),
2285        );
2286        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287        let addr = listener.local_addr().unwrap();
2288        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2289        let node = format!("127.0.0.1:{}", addr.port());
2290
2291        let cli = Cli {
2292            node,
2293            token: Some("test-token".into()),
2294            command: Commands::List,
2295        };
2296        let result = execute(&cli).await.unwrap();
2297        assert_eq!(result, "No sessions.");
2298    }
2299
2300    // -- Interventions tests --
2301
2302    #[test]
2303    fn test_cli_parse_interventions() {
2304        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2305        assert!(matches!(
2306            &cli.command,
2307            Commands::Interventions { name } if name == "my-session"
2308        ));
2309    }
2310
2311    #[test]
2312    fn test_format_interventions_empty() {
2313        assert_eq!(format_interventions(&[]), "No intervention events.");
2314    }
2315
2316    #[test]
2317    fn test_format_interventions_with_data() {
2318        let events = vec![
2319            InterventionEventResponse {
2320                id: 1,
2321                session_id: "sess-1".into(),
2322                reason: "Memory exceeded threshold".into(),
2323                created_at: "2026-01-01T00:00:00Z".into(),
2324            },
2325            InterventionEventResponse {
2326                id: 2,
2327                session_id: "sess-1".into(),
2328                reason: "Idle for 10 minutes".into(),
2329                created_at: "2026-01-02T00:00:00Z".into(),
2330            },
2331        ];
2332        let output = format_interventions(&events);
2333        assert!(output.contains("ID"));
2334        assert!(output.contains("TIMESTAMP"));
2335        assert!(output.contains("REASON"));
2336        assert!(output.contains("Memory exceeded threshold"));
2337        assert!(output.contains("Idle for 10 minutes"));
2338        assert!(output.contains("2026-01-01T00:00:00Z"));
2339    }
2340
2341    #[tokio::test]
2342    async fn test_execute_interventions_empty() {
2343        let node = start_test_server().await;
2344        let cli = Cli {
2345            node,
2346            token: None,
2347            command: Commands::Interventions {
2348                name: "my-session".into(),
2349            },
2350        };
2351        let result = execute(&cli).await.unwrap();
2352        assert_eq!(result, "No intervention events.");
2353    }
2354
2355    #[tokio::test]
2356    async fn test_execute_interventions_with_data() {
2357        use axum::{Router, routing::get};
2358
2359        let app = Router::new().route(
2360            "/api/v1/sessions/{id}/interventions",
2361            get(|| async {
2362                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2363                    .to_owned()
2364            }),
2365        );
2366        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2367        let addr = listener.local_addr().unwrap();
2368        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2369        let node = format!("127.0.0.1:{}", addr.port());
2370
2371        let cli = Cli {
2372            node,
2373            token: None,
2374            command: Commands::Interventions {
2375                name: "test".into(),
2376            },
2377        };
2378        let result = execute(&cli).await.unwrap();
2379        assert!(result.contains("OOM"));
2380        assert!(result.contains("2026-01-01T00:00:00Z"));
2381    }
2382
2383    #[tokio::test]
2384    async fn test_execute_interventions_connection_refused() {
2385        let cli = Cli {
2386            node: "localhost:1".into(),
2387            token: None,
2388            command: Commands::Interventions {
2389                name: "test".into(),
2390            },
2391        };
2392        let result = execute(&cli).await;
2393        let err = result.unwrap_err().to_string();
2394        assert!(err.contains("Could not connect to pulpod"));
2395    }
2396
2397    // -- Attach command tests --
2398
2399    #[test]
2400    fn test_build_attach_command() {
2401        let cmd = build_attach_command("pulpo-my-session");
2402        assert_eq!(cmd.get_program(), "tmux");
2403        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2404        assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2405    }
2406
2407    #[test]
2408    fn test_cli_parse_attach() {
2409        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2410        assert!(matches!(
2411            &cli.command,
2412            Commands::Attach { name } if name == "my-session"
2413        ));
2414    }
2415
2416    #[test]
2417    fn test_cli_parse_attach_alias() {
2418        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2419        assert!(matches!(
2420            &cli.command,
2421            Commands::Attach { name } if name == "my-session"
2422        ));
2423    }
2424
2425    #[tokio::test]
2426    async fn test_execute_attach_success() {
2427        let node = start_test_server().await;
2428        let cli = Cli {
2429            node,
2430            token: None,
2431            command: Commands::Attach {
2432                name: "test-session".into(),
2433            },
2434        };
2435        let result = execute(&cli).await.unwrap();
2436        assert!(result.contains("Detached from session test-session"));
2437    }
2438
2439    #[tokio::test]
2440    async fn test_execute_attach_with_backend_session_id() {
2441        use axum::{Router, routing::get};
2442        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2443        let app = Router::new().route(
2444            "/api/v1/sessions/{id}",
2445            get(move || async move { session_json.to_owned() }),
2446        );
2447        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2448        let addr = listener.local_addr().unwrap();
2449        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2450
2451        let cli = Cli {
2452            node: format!("127.0.0.1:{}", addr.port()),
2453            token: None,
2454            command: Commands::Attach {
2455                name: "my-session".into(),
2456            },
2457        };
2458        let result = execute(&cli).await.unwrap();
2459        assert!(result.contains("Detached from session my-session"));
2460    }
2461
2462    #[tokio::test]
2463    async fn test_execute_attach_connection_refused() {
2464        let cli = Cli {
2465            node: "localhost:1".into(),
2466            token: None,
2467            command: Commands::Attach {
2468                name: "test-session".into(),
2469            },
2470        };
2471        let result = execute(&cli).await;
2472        let err = result.unwrap_err().to_string();
2473        assert!(err.contains("Could not connect to pulpod"));
2474    }
2475
2476    #[tokio::test]
2477    async fn test_execute_attach_error_response() {
2478        use axum::{Router, http::StatusCode, routing::get};
2479        let app = Router::new().route(
2480            "/api/v1/sessions/{id}",
2481            get(|| async {
2482                (
2483                    StatusCode::NOT_FOUND,
2484                    r#"{"error":"session not found"}"#.to_owned(),
2485                )
2486            }),
2487        );
2488        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2489        let addr = listener.local_addr().unwrap();
2490        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2491
2492        let cli = Cli {
2493            node: format!("127.0.0.1:{}", addr.port()),
2494            token: None,
2495            command: Commands::Attach {
2496                name: "nonexistent".into(),
2497            },
2498        };
2499        let result = execute(&cli).await;
2500        let err = result.unwrap_err().to_string();
2501        assert!(err.contains("session not found"));
2502    }
2503
2504    // -- Alias parse tests --
2505
2506    #[test]
2507    fn test_cli_parse_alias_spawn() {
2508        let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2509        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2510    }
2511
2512    #[test]
2513    fn test_cli_parse_alias_list() {
2514        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2515        assert!(matches!(cli.command, Commands::List));
2516    }
2517
2518    #[test]
2519    fn test_cli_parse_alias_logs() {
2520        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2521        assert!(matches!(
2522            &cli.command,
2523            Commands::Logs { name, .. } if name == "my-session"
2524        ));
2525    }
2526
2527    #[test]
2528    fn test_cli_parse_alias_kill() {
2529        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2530        assert!(matches!(
2531            &cli.command,
2532            Commands::Kill { name } if name == "my-session"
2533        ));
2534    }
2535
2536    #[test]
2537    fn test_cli_parse_alias_delete() {
2538        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2539        assert!(matches!(
2540            &cli.command,
2541            Commands::Delete { name } if name == "my-session"
2542        ));
2543    }
2544
2545    #[test]
2546    fn test_cli_parse_alias_resume() {
2547        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2548        assert!(matches!(
2549            &cli.command,
2550            Commands::Resume { name } if name == "my-session"
2551        ));
2552    }
2553
2554    #[test]
2555    fn test_cli_parse_alias_nodes() {
2556        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2557        assert!(matches!(cli.command, Commands::Nodes));
2558    }
2559
2560    #[test]
2561    fn test_cli_parse_alias_interventions() {
2562        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2563        assert!(matches!(
2564            &cli.command,
2565            Commands::Interventions { name } if name == "my-session"
2566        ));
2567    }
2568
2569    #[test]
2570    fn test_api_error_json() {
2571        let err = api_error("{\"error\":\"session not found: foo\"}");
2572        assert_eq!(err.to_string(), "session not found: foo");
2573    }
2574
2575    #[test]
2576    fn test_api_error_plain_text() {
2577        let err = api_error("plain text error");
2578        assert_eq!(err.to_string(), "plain text error");
2579    }
2580
2581    // -- diff_output tests --
2582
2583    #[test]
2584    fn test_diff_output_empty_prev() {
2585        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2586    }
2587
2588    #[test]
2589    fn test_diff_output_identical() {
2590        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2591    }
2592
2593    #[test]
2594    fn test_diff_output_new_lines_appended() {
2595        let prev = "line1\nline2";
2596        let new = "line1\nline2\nline3\nline4";
2597        assert_eq!(diff_output(prev, new), "line3\nline4");
2598    }
2599
2600    #[test]
2601    fn test_diff_output_scrolled_window() {
2602        // Window of 3 lines: old lines scroll off top, new appear at bottom
2603        let prev = "line1\nline2\nline3";
2604        let new = "line2\nline3\nline4";
2605        assert_eq!(diff_output(prev, new), "line4");
2606    }
2607
2608    #[test]
2609    fn test_diff_output_completely_different() {
2610        let prev = "aaa\nbbb";
2611        let new = "xxx\nyyy";
2612        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2613    }
2614
2615    #[test]
2616    fn test_diff_output_last_line_matches_but_overlap_fails() {
2617        // Last line of prev appears in new but preceding lines don't match
2618        let prev = "aaa\ncommon";
2619        let new = "zzz\ncommon\nnew_line";
2620        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2621        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2622        // Falls through, no verified overlap, so returns everything
2623        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2624    }
2625
2626    #[test]
2627    fn test_diff_output_new_empty() {
2628        assert_eq!(diff_output("line1", ""), "");
2629    }
2630
2631    // -- follow_logs tests --
2632
2633    /// Start a test server that simulates evolving output and session status transitions.
2634    async fn start_follow_test_server() -> String {
2635        use axum::{Router, extract::Path, extract::Query, routing::get};
2636        use std::sync::Arc;
2637        use std::sync::atomic::{AtomicUsize, Ordering};
2638
2639        let call_count = Arc::new(AtomicUsize::new(0));
2640        let output_count = call_count.clone();
2641        let status_count = Arc::new(AtomicUsize::new(0));
2642        let status_count_inner = status_count.clone();
2643
2644        let app = Router::new()
2645            .route(
2646                "/api/v1/sessions/{id}/output",
2647                get(
2648                    move |_path: Path<String>,
2649                          _query: Query<std::collections::HashMap<String, String>>| {
2650                        let count = output_count.clone();
2651                        async move {
2652                            let n = count.fetch_add(1, Ordering::SeqCst);
2653                            let output = match n {
2654                                0 => "line1\nline2".to_owned(),
2655                                1 => "line1\nline2\nline3".to_owned(),
2656                                _ => "line2\nline3\nline4".to_owned(),
2657                            };
2658                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2659                        }
2660                    },
2661                ),
2662            )
2663            .route(
2664                "/api/v1/sessions/{id}",
2665                get(move |_path: Path<String>| {
2666                    let count = status_count_inner.clone();
2667                    async move {
2668                        let n = count.fetch_add(1, Ordering::SeqCst);
2669                        let status = if n < 2 { "running" } else { "completed" };
2670                        format!(
2671                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2672                        )
2673                    }
2674                }),
2675            );
2676
2677        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2678        let addr = listener.local_addr().unwrap();
2679        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2680        format!("http://127.0.0.1:{}", addr.port())
2681    }
2682
2683    #[tokio::test]
2684    async fn test_follow_logs_polls_and_exits_on_completed() {
2685        let base = start_follow_test_server().await;
2686        let client = reqwest::Client::new();
2687        let mut buf = Vec::new();
2688
2689        follow_logs(&client, &base, "test", 100, None, &mut buf)
2690            .await
2691            .unwrap();
2692
2693        let output = String::from_utf8(buf).unwrap();
2694        // Should contain initial output + new lines
2695        assert!(output.contains("line1"));
2696        assert!(output.contains("line2"));
2697        assert!(output.contains("line3"));
2698        assert!(output.contains("line4"));
2699    }
2700
2701    #[tokio::test]
2702    async fn test_execute_logs_follow_success() {
2703        let base = start_follow_test_server().await;
2704        // Extract host:port from http://127.0.0.1:PORT
2705        let node = base.strip_prefix("http://").unwrap().to_owned();
2706
2707        let cli = Cli {
2708            node,
2709            token: None,
2710            command: Commands::Logs {
2711                name: "test".into(),
2712                lines: 100,
2713                follow: true,
2714            },
2715        };
2716        // execute() with follow writes to stdout and returns empty string
2717        let result = execute(&cli).await.unwrap();
2718        assert_eq!(result, "");
2719    }
2720
2721    #[tokio::test]
2722    async fn test_execute_logs_follow_connection_refused() {
2723        let cli = Cli {
2724            node: "localhost:1".into(),
2725            token: None,
2726            command: Commands::Logs {
2727                name: "test".into(),
2728                lines: 50,
2729                follow: true,
2730            },
2731        };
2732        let result = execute(&cli).await;
2733        let err = result.unwrap_err().to_string();
2734        assert!(
2735            err.contains("Could not connect to pulpod"),
2736            "Expected friendly error, got: {err}"
2737        );
2738    }
2739
2740    #[tokio::test]
2741    async fn test_follow_logs_exits_on_dead() {
2742        use axum::{Router, extract::Path, extract::Query, routing::get};
2743
2744        let app = Router::new()
2745            .route(
2746                "/api/v1/sessions/{id}/output",
2747                get(
2748                    |_path: Path<String>,
2749                     _query: Query<std::collections::HashMap<String, String>>| async {
2750                        r#"{"output":"some output"}"#.to_owned()
2751                    },
2752                ),
2753            )
2754            .route(
2755                "/api/v1/sessions/{id}",
2756                get(|_path: Path<String>| async {
2757                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2758                }),
2759            );
2760
2761        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2762        let addr = listener.local_addr().unwrap();
2763        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2764        let base = format!("http://127.0.0.1:{}", addr.port());
2765
2766        let client = reqwest::Client::new();
2767        let mut buf = Vec::new();
2768        follow_logs(&client, &base, "test", 100, None, &mut buf)
2769            .await
2770            .unwrap();
2771
2772        let output = String::from_utf8(buf).unwrap();
2773        assert!(output.contains("some output"));
2774    }
2775
2776    #[tokio::test]
2777    async fn test_follow_logs_exits_on_stale() {
2778        use axum::{Router, extract::Path, extract::Query, routing::get};
2779
2780        let app = Router::new()
2781            .route(
2782                "/api/v1/sessions/{id}/output",
2783                get(
2784                    |_path: Path<String>,
2785                     _query: Query<std::collections::HashMap<String, String>>| async {
2786                        r#"{"output":"stale output"}"#.to_owned()
2787                    },
2788                ),
2789            )
2790            .route(
2791                "/api/v1/sessions/{id}",
2792                get(|_path: Path<String>| async {
2793                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2794                }),
2795            );
2796
2797        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2798        let addr = listener.local_addr().unwrap();
2799        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2800        let base = format!("http://127.0.0.1:{}", addr.port());
2801
2802        let client = reqwest::Client::new();
2803        let mut buf = Vec::new();
2804        follow_logs(&client, &base, "test", 100, None, &mut buf)
2805            .await
2806            .unwrap();
2807
2808        let output = String::from_utf8(buf).unwrap();
2809        assert!(output.contains("stale output"));
2810    }
2811
2812    #[tokio::test]
2813    async fn test_execute_logs_follow_non_reqwest_error() {
2814        use axum::{Router, extract::Path, extract::Query, routing::get};
2815
2816        // Session status endpoint returns invalid JSON to trigger a serde error
2817        let app = Router::new()
2818            .route(
2819                "/api/v1/sessions/{id}/output",
2820                get(
2821                    |_path: Path<String>,
2822                     _query: Query<std::collections::HashMap<String, String>>| async {
2823                        r#"{"output":"initial"}"#.to_owned()
2824                    },
2825                ),
2826            )
2827            .route(
2828                "/api/v1/sessions/{id}",
2829                get(|_path: Path<String>| async { "not valid json".to_owned() }),
2830            );
2831
2832        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2833        let addr = listener.local_addr().unwrap();
2834        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2835        let node = format!("127.0.0.1:{}", addr.port());
2836
2837        let cli = Cli {
2838            node,
2839            token: None,
2840            command: Commands::Logs {
2841                name: "test".into(),
2842                lines: 100,
2843                follow: true,
2844            },
2845        };
2846        let err = execute(&cli).await.unwrap_err();
2847        // serde_json error, not a reqwest error — hits the Err(other) branch
2848        let msg = err.to_string();
2849        assert!(
2850            msg.contains("expected ident"),
2851            "Expected serde parse error, got: {msg}"
2852        );
2853    }
2854
2855    #[test]
2856    fn test_cli_parse_spawn_with_guardrails() {
2857        let cli = Cli::try_parse_from([
2858            "pulpo",
2859            "spawn",
2860            "--workdir",
2861            "/tmp",
2862            "--max-turns",
2863            "10",
2864            "--max-budget",
2865            "5.5",
2866            "--output-format",
2867            "json",
2868            "Do it",
2869        ])
2870        .unwrap();
2871        assert!(matches!(
2872            &cli.command,
2873            Commands::Spawn { max_turns, max_budget, output_format, .. }
2874                if *max_turns == Some(10) && *max_budget == Some(5.5)
2875                && output_format.as_deref() == Some("json")
2876        ));
2877    }
2878
2879    #[tokio::test]
2880    async fn test_fetch_session_status_connection_error() {
2881        let client = reqwest::Client::new();
2882        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
2883        assert!(result.is_err());
2884    }
2885
2886    // -- Crontab wrapper tests --
2887
2888    #[test]
2889    fn test_build_crontab_line() {
2890        let line = build_crontab_line(
2891            "nightly-review",
2892            "0 3 * * *",
2893            "/home/me/repo",
2894            "claude",
2895            "Review PRs",
2896            "localhost:7433",
2897        );
2898        assert_eq!(
2899            line,
2900            "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
2901        );
2902    }
2903
2904    #[test]
2905    fn test_crontab_install_success() {
2906        let crontab = "# existing cron\n0 * * * * echo hi\n";
2907        let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
2908        let result = crontab_install(crontab, "my-job", line).unwrap();
2909        assert!(result.starts_with("# existing cron\n"));
2910        assert!(result.ends_with("#pulpo:my-job\n"));
2911        assert!(result.contains("echo hi"));
2912    }
2913
2914    #[test]
2915    fn test_crontab_install_duplicate_error() {
2916        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2917        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
2918        let err = crontab_install(crontab, "my-job", line).unwrap_err();
2919        assert!(err.to_string().contains("already exists"));
2920    }
2921
2922    #[test]
2923    fn test_crontab_list_empty() {
2924        assert_eq!(crontab_list(""), "No pulpo schedules.");
2925    }
2926
2927    #[test]
2928    fn test_crontab_list_no_pulpo_entries() {
2929        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
2930    }
2931
2932    #[test]
2933    fn test_crontab_list_with_entries() {
2934        let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
2935        let output = crontab_list(crontab);
2936        assert!(output.contains("NAME"));
2937        assert!(output.contains("CRON"));
2938        assert!(output.contains("PAUSED"));
2939        assert!(output.contains("nightly"));
2940        assert!(output.contains("0 3 * * *"));
2941        assert!(output.contains("no"));
2942    }
2943
2944    #[test]
2945    fn test_crontab_list_paused_entry() {
2946        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
2947        let output = crontab_list(crontab);
2948        assert!(output.contains("paused-job"));
2949        assert!(output.contains("yes"));
2950    }
2951
2952    #[test]
2953    fn test_crontab_list_short_line() {
2954        // A line with fewer than 5 space-separated parts but still tagged
2955        let crontab = "badcron #pulpo:broken\n";
2956        let output = crontab_list(crontab);
2957        assert!(output.contains("broken"));
2958        assert!(output.contains('?'));
2959    }
2960
2961    #[test]
2962    fn test_crontab_remove_success() {
2963        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
2964        let result = crontab_remove(crontab, "my-job").unwrap();
2965        assert!(result.contains("echo hi"));
2966        assert!(!result.contains("my-job"));
2967    }
2968
2969    #[test]
2970    fn test_crontab_remove_not_found() {
2971        let crontab = "0 * * * * echo hi\n";
2972        let err = crontab_remove(crontab, "ghost").unwrap_err();
2973        assert!(err.to_string().contains("not found"));
2974    }
2975
2976    #[test]
2977    fn test_crontab_pause_success() {
2978        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2979        let result = crontab_pause(crontab, "my-job").unwrap();
2980        assert!(result.starts_with('#'));
2981        assert!(result.contains("#pulpo:my-job"));
2982    }
2983
2984    #[test]
2985    fn test_crontab_pause_not_found() {
2986        let crontab = "0 * * * * echo hi\n";
2987        let err = crontab_pause(crontab, "ghost").unwrap_err();
2988        assert!(err.to_string().contains("not found or already paused"));
2989    }
2990
2991    #[test]
2992    fn test_crontab_pause_already_paused() {
2993        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2994        let err = crontab_pause(crontab, "my-job").unwrap_err();
2995        assert!(err.to_string().contains("already paused"));
2996    }
2997
2998    #[test]
2999    fn test_crontab_resume_success() {
3000        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3001        let result = crontab_resume(crontab, "my-job").unwrap();
3002        assert!(!result.starts_with('#'));
3003        assert!(result.contains("#pulpo:my-job"));
3004    }
3005
3006    #[test]
3007    fn test_crontab_resume_not_found() {
3008        let crontab = "0 * * * * echo hi\n";
3009        let err = crontab_resume(crontab, "ghost").unwrap_err();
3010        assert!(err.to_string().contains("not found or not paused"));
3011    }
3012
3013    #[test]
3014    fn test_crontab_resume_not_paused() {
3015        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3016        let err = crontab_resume(crontab, "my-job").unwrap_err();
3017        assert!(err.to_string().contains("not paused"));
3018    }
3019
3020    // -- Schedule CLI parse tests --
3021
3022    #[test]
3023    fn test_cli_parse_schedule_install() {
3024        let cli = Cli::try_parse_from([
3025            "pulpo",
3026            "schedule",
3027            "install",
3028            "nightly",
3029            "0 3 * * *",
3030            "--workdir",
3031            "/tmp/repo",
3032            "Review",
3033            "PRs",
3034        ])
3035        .unwrap();
3036        assert!(matches!(
3037            &cli.command,
3038            Commands::Schedule {
3039                action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3040            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3041              && provider == "claude" && prompt == &["Review", "PRs"]
3042        ));
3043    }
3044
3045    #[test]
3046    fn test_cli_parse_schedule_list() {
3047        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3048        assert!(matches!(
3049            &cli.command,
3050            Commands::Schedule {
3051                action: ScheduleAction::List
3052            }
3053        ));
3054    }
3055
3056    #[test]
3057    fn test_cli_parse_schedule_remove() {
3058        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3059        assert!(matches!(
3060            &cli.command,
3061            Commands::Schedule {
3062                action: ScheduleAction::Remove { name }
3063            } if name == "nightly"
3064        ));
3065    }
3066
3067    #[test]
3068    fn test_cli_parse_schedule_pause() {
3069        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3070        assert!(matches!(
3071            &cli.command,
3072            Commands::Schedule {
3073                action: ScheduleAction::Pause { name }
3074            } if name == "nightly"
3075        ));
3076    }
3077
3078    #[test]
3079    fn test_cli_parse_schedule_resume() {
3080        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3081        assert!(matches!(
3082            &cli.command,
3083            Commands::Schedule {
3084                action: ScheduleAction::Resume { name }
3085            } if name == "nightly"
3086        ));
3087    }
3088
3089    #[test]
3090    fn test_cli_parse_schedule_alias() {
3091        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3092        assert!(matches!(
3093            &cli.command,
3094            Commands::Schedule {
3095                action: ScheduleAction::List
3096            }
3097        ));
3098    }
3099
3100    #[test]
3101    fn test_cli_parse_schedule_list_alias() {
3102        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3103        assert!(matches!(
3104            &cli.command,
3105            Commands::Schedule {
3106                action: ScheduleAction::List
3107            }
3108        ));
3109    }
3110
3111    #[test]
3112    fn test_cli_parse_schedule_remove_alias() {
3113        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3114        assert!(matches!(
3115            &cli.command,
3116            Commands::Schedule {
3117                action: ScheduleAction::Remove { name }
3118            } if name == "nightly"
3119        ));
3120    }
3121
3122    #[test]
3123    fn test_cli_parse_schedule_install_custom_provider() {
3124        let cli = Cli::try_parse_from([
3125            "pulpo",
3126            "schedule",
3127            "install",
3128            "daily",
3129            "0 9 * * *",
3130            "--workdir",
3131            "/tmp",
3132            "--provider",
3133            "codex",
3134            "Run tests",
3135        ])
3136        .unwrap();
3137        assert!(matches!(
3138            &cli.command,
3139            Commands::Schedule {
3140                action: ScheduleAction::Install { provider, .. }
3141            } if provider == "codex"
3142        ));
3143    }
3144
3145    #[tokio::test]
3146    async fn test_execute_schedule_via_execute() {
3147        // Under coverage builds, execute_schedule is a stub returning Ok("")
3148        let node = start_test_server().await;
3149        let cli = Cli {
3150            node,
3151            token: None,
3152            command: Commands::Schedule {
3153                action: ScheduleAction::List,
3154            },
3155        };
3156        let result = execute(&cli).await;
3157        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3158        assert!(result.is_ok() || result.is_err());
3159    }
3160
3161    #[test]
3162    fn test_schedule_action_debug() {
3163        let action = ScheduleAction::List;
3164        assert_eq!(format!("{action:?}"), "List");
3165    }
3166}