Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{AuthTokenResponse, InterventionEventResponse, PeersResponse};
4use pulpo_common::session::Session;
5
6#[derive(Parser, Debug)]
7#[command(
8    name = "pulpo",
9    about = "Manage agent sessions across your machines",
10    version
11)]
12pub struct Cli {
13    /// Target node (default: localhost)
14    #[arg(long, default_value = "localhost:7433")]
15    pub node: String,
16
17    /// Auth token (auto-discovered from local daemon if omitted)
18    #[arg(long)]
19    pub token: Option<String>,
20
21    #[command(subcommand)]
22    pub command: Commands,
23}
24
25#[derive(Subcommand, Debug)]
26#[allow(clippy::large_enum_variant)]
27pub enum Commands {
28    /// Attach to a session's terminal
29    #[command(visible_alias = "a")]
30    Attach {
31        /// Session name or ID
32        name: String,
33    },
34
35    /// Send input to a session
36    #[command(visible_alias = "i")]
37    Input {
38        /// Session name or ID
39        name: String,
40        /// Text to send (sends Enter if omitted)
41        text: Option<String>,
42    },
43
44    /// Spawn a new agent session
45    #[command(visible_alias = "s")]
46    Spawn {
47        /// Working directory
48        #[arg(long)]
49        workdir: String,
50
51        /// Session name (auto-derived from workdir if omitted)
52        #[arg(long)]
53        name: Option<String>,
54
55        /// Agent provider (claude, codex)
56        #[arg(long, default_value = "claude")]
57        provider: String,
58
59        /// Run in autonomous mode (fire-and-forget)
60        #[arg(long)]
61        auto: bool,
62
63        /// Guard preset (strict, standard, unrestricted)
64        #[arg(long, default_value = "standard")]
65        guard: String,
66
67        /// Model override (e.g. opus, sonnet)
68        #[arg(long)]
69        model: Option<String>,
70
71        /// System prompt to append
72        #[arg(long)]
73        system_prompt: Option<String>,
74
75        /// Explicit allowed tools (comma-separated)
76        #[arg(long, value_delimiter = ',')]
77        allowed_tools: Option<Vec<String>>,
78
79        /// Persona name (from config)
80        #[arg(long)]
81        persona: Option<String>,
82
83        /// Maximum agent turns before stopping
84        #[arg(long)]
85        max_turns: Option<u32>,
86
87        /// Maximum budget in USD before stopping
88        #[arg(long)]
89        max_budget: Option<f64>,
90
91        /// Output format (e.g. json, stream-json)
92        #[arg(long)]
93        output_format: Option<String>,
94
95        /// Task prompt
96        prompt: Vec<String>,
97    },
98
99    /// List all sessions
100    #[command(visible_alias = "ls")]
101    List,
102
103    /// Show session logs/output
104    #[command(visible_alias = "l")]
105    Logs {
106        /// Session name or ID
107        name: String,
108
109        /// Number of lines to fetch
110        #[arg(long, default_value = "100")]
111        lines: usize,
112
113        /// Follow output (like `tail -f`)
114        #[arg(short, long)]
115        follow: bool,
116    },
117
118    /// Kill a session
119    #[command(visible_alias = "k")]
120    Kill {
121        /// Session name or ID
122        name: String,
123    },
124
125    /// Permanently remove a session from history
126    #[command(visible_alias = "rm")]
127    Delete {
128        /// Session name or ID
129        name: String,
130    },
131
132    /// Resume a stale session
133    #[command(visible_alias = "r")]
134    Resume {
135        /// Session name or ID
136        name: String,
137    },
138
139    /// List all known nodes
140    #[command(visible_alias = "n")]
141    Nodes,
142
143    /// Show intervention history for a session
144    #[command(visible_alias = "iv")]
145    Interventions {
146        /// Session name or ID
147        name: String,
148    },
149
150    /// Open the web dashboard in your browser
151    Ui,
152
153    /// Manage scheduled agent runs via crontab
154    #[command(visible_alias = "sched")]
155    Schedule {
156        #[command(subcommand)]
157        action: ScheduleAction,
158    },
159}
160
161#[derive(Subcommand, Debug)]
162pub enum ScheduleAction {
163    /// Install a cron schedule that spawns a session
164    Install {
165        /// Schedule name
166        name: String,
167        /// Cron expression (e.g. "0 3 * * *")
168        cron: String,
169        /// Working directory
170        #[arg(long)]
171        workdir: String,
172        /// Agent provider
173        #[arg(long, default_value = "claude")]
174        provider: String,
175        /// Task prompt
176        prompt: Vec<String>,
177    },
178    /// List installed pulpo cron schedules
179    #[command(alias = "ls")]
180    List,
181    /// Remove a cron schedule
182    #[command(alias = "rm")]
183    Remove {
184        /// Schedule name
185        name: String,
186    },
187    /// Pause a cron schedule (comments out the line)
188    Pause {
189        /// Schedule name
190        name: String,
191    },
192    /// Resume a paused cron schedule (uncomments the line)
193    Resume {
194        /// Schedule name
195        name: String,
196    },
197}
198
199/// Format the base URL from the node address.
200pub fn base_url(node: &str) -> String {
201    format!("http://{node}")
202}
203
204/// Response shape for the output endpoint.
205#[derive(serde::Deserialize)]
206struct OutputResponse {
207    output: String,
208}
209
210/// Format a list of sessions as a table.
211fn format_sessions(sessions: &[Session]) -> String {
212    if sessions.is_empty() {
213        return "No sessions.".into();
214    }
215    let mut lines = vec![format!(
216        "{:<20} {:<12} {:<10} {:<14} {}",
217        "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
218    )];
219    for s in sessions {
220        let prompt_display = if s.prompt.len() > 40 {
221            format!("{}...", &s.prompt[..37])
222        } else {
223            s.prompt.clone()
224        };
225        let status_display = if s.waiting_for_input {
226            "waiting".to_owned()
227        } else {
228            s.status.to_string()
229        };
230        lines.push(format!(
231            "{:<20} {:<12} {:<10} {:<14} {}",
232            s.name, status_display, s.provider, s.mode, prompt_display
233        ));
234    }
235    lines.join("\n")
236}
237
238/// Format the peers response as a table.
239fn format_nodes(resp: &PeersResponse) -> String {
240    let mut lines = vec![format!(
241        "{:<20} {:<25} {:<10} {}",
242        "NAME", "ADDRESS", "STATUS", "SESSIONS"
243    )];
244    lines.push(format!(
245        "{:<20} {:<25} {:<10} {}",
246        resp.local.name, "(local)", "online", "-"
247    ));
248    for p in &resp.peers {
249        let sessions = p
250            .session_count
251            .map_or_else(|| "-".into(), |c| c.to_string());
252        lines.push(format!(
253            "{:<20} {:<25} {:<10} {}",
254            p.name, p.address, p.status, sessions
255        ));
256    }
257    lines.join("\n")
258}
259
260/// Format intervention events as a table.
261fn format_interventions(events: &[InterventionEventResponse]) -> String {
262    if events.is_empty() {
263        return "No intervention events.".into();
264    }
265    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
266    for e in events {
267        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
268    }
269    lines.join("\n")
270}
271
272/// Build the command to open a URL in the default browser.
273#[cfg_attr(coverage, allow(dead_code))]
274fn build_open_command(url: &str) -> std::process::Command {
275    #[cfg(target_os = "macos")]
276    {
277        let mut cmd = std::process::Command::new("open");
278        cmd.arg(url);
279        cmd
280    }
281    #[cfg(target_os = "linux")]
282    {
283        let mut cmd = std::process::Command::new("xdg-open");
284        cmd.arg(url);
285        cmd
286    }
287    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
288    {
289        // Fallback: try xdg-open
290        let mut cmd = std::process::Command::new("xdg-open");
291        cmd.arg(url);
292        cmd
293    }
294}
295
296/// Open a URL in the default browser.
297#[cfg(not(coverage))]
298fn open_browser(url: &str) -> Result<()> {
299    build_open_command(url).status()?;
300    Ok(())
301}
302
303/// Stub for coverage builds — avoids opening a browser during tests.
304#[cfg(coverage)]
305fn open_browser(_url: &str) -> Result<()> {
306    Ok(())
307}
308
309/// Build the command to attach to a session's terminal.
310/// Currently uses tmux directly — the CLI is always local to the backend.
311#[cfg_attr(coverage, allow(dead_code))]
312fn build_attach_command(name: &str) -> std::process::Command {
313    let mut cmd = std::process::Command::new("tmux");
314    cmd.args(["attach-session", "-t", &format!("pulpo-{name}")]);
315    cmd
316}
317
318/// Attach to a session's terminal.
319#[cfg(not(any(test, coverage)))]
320fn attach_session(name: &str) -> Result<()> {
321    let status = build_attach_command(name).status()?;
322    if !status.success() {
323        anyhow::bail!("attach failed with {status}");
324    }
325    Ok(())
326}
327
328/// Stub for test and coverage builds — avoids spawning real terminals during tests.
329#[cfg(any(test, coverage))]
330#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
331fn attach_session(_name: &str) -> Result<()> {
332    Ok(())
333}
334
335/// Extract a clean error message from an API JSON response (or fall back to raw text).
336fn api_error(text: &str) -> anyhow::Error {
337    serde_json::from_str::<serde_json::Value>(text)
338        .ok()
339        .and_then(|v| v["error"].as_str().map(String::from))
340        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
341}
342
343/// Return the response body text, or a clean error if the response was non-success.
344async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
345    if resp.status().is_success() {
346        Ok(resp.text().await?)
347    } else {
348        let text = resp.text().await?;
349        Err(api_error(&text))
350    }
351}
352
353/// Map a reqwest error to a user-friendly message.
354fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
355    if err.is_connect() {
356        anyhow::anyhow!(
357            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
358        )
359    } else {
360        anyhow::anyhow!("Network error connecting to {node}: {err}")
361    }
362}
363
364/// Check if the node address points to localhost.
365fn is_localhost(node: &str) -> bool {
366    let host = node.split(':').next().unwrap_or(node);
367    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
368}
369
370/// Try to auto-discover the auth token from a local daemon.
371async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
372    let resp = client
373        .get(format!("{base}/api/v1/auth/token"))
374        .send()
375        .await
376        .ok()?;
377    let body: AuthTokenResponse = resp.json().await.ok()?;
378    if body.token.is_empty() {
379        None
380    } else {
381        Some(body.token)
382    }
383}
384
385/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
386async fn resolve_token(
387    client: &reqwest::Client,
388    base: &str,
389    node: &str,
390    explicit: Option<&str>,
391) -> Option<String> {
392    if let Some(t) = explicit {
393        return Some(t.to_owned());
394    }
395    if is_localhost(node) {
396        return discover_token(client, base).await;
397    }
398    None
399}
400
401/// Build an authenticated GET request.
402fn authed_get(
403    client: &reqwest::Client,
404    url: String,
405    token: Option<&str>,
406) -> reqwest::RequestBuilder {
407    let req = client.get(url);
408    if let Some(t) = token {
409        req.bearer_auth(t)
410    } else {
411        req
412    }
413}
414
415/// Build an authenticated POST request.
416fn authed_post(
417    client: &reqwest::Client,
418    url: String,
419    token: Option<&str>,
420) -> reqwest::RequestBuilder {
421    let req = client.post(url);
422    if let Some(t) = token {
423        req.bearer_auth(t)
424    } else {
425        req
426    }
427}
428
429/// Build an authenticated DELETE request.
430fn authed_delete(
431    client: &reqwest::Client,
432    url: String,
433    token: Option<&str>,
434) -> reqwest::RequestBuilder {
435    let req = client.delete(url);
436    if let Some(t) = token {
437        req.bearer_auth(t)
438    } else {
439        req
440    }
441}
442
443/// Fetch session output from the API.
444async fn fetch_output(
445    client: &reqwest::Client,
446    base: &str,
447    name: &str,
448    lines: usize,
449    token: Option<&str>,
450) -> Result<String> {
451    let resp = authed_get(
452        client,
453        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
454        token,
455    )
456    .send()
457    .await?;
458    let text = ok_or_api_error(resp).await?;
459    let output: OutputResponse = serde_json::from_str(&text)?;
460    Ok(output.output)
461}
462
463/// Fetch session status from the API.
464async fn fetch_session_status(
465    client: &reqwest::Client,
466    base: &str,
467    name: &str,
468    token: Option<&str>,
469) -> Result<String> {
470    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
471        .send()
472        .await?;
473    let text = ok_or_api_error(resp).await?;
474    let session: Session = serde_json::from_str(&text)?;
475    Ok(session.status.to_string())
476}
477
478/// Compute the new trailing lines that differ from the previous output.
479///
480/// The output endpoint returns the last N lines from the terminal pane. As new lines
481/// appear, old lines at the top scroll off. We find the overlap between the end
482/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
483/// trailing lines.
484fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
485    if prev.is_empty() {
486        return new;
487    }
488
489    let prev_lines: Vec<&str> = prev.lines().collect();
490    let new_lines: Vec<&str> = new.lines().collect();
491
492    if new_lines.is_empty() {
493        return "";
494    }
495
496    // prev is non-empty (early return above), so last() always succeeds
497    let last_prev = prev_lines[prev_lines.len() - 1];
498
499    // Find the last line of prev in new to determine the overlap boundary
500    for i in (0..new_lines.len()).rev() {
501        if new_lines[i] == last_prev {
502            // Verify contiguous overlap: check that lines before this match too
503            let overlap_len = prev_lines.len().min(i + 1);
504            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
505            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
506            if prev_tail == new_overlap {
507                if i + 1 < new_lines.len() {
508                    // Return the slice of `new` after the overlap
509                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
510                    return new.get(consumed.min(new.len())..).unwrap_or("");
511                }
512                return "";
513            }
514        }
515    }
516
517    // No overlap found — output changed completely, print it all
518    new
519}
520
521/// Follow logs by polling, printing only new output. Returns when the session ends.
522async fn follow_logs(
523    client: &reqwest::Client,
524    base: &str,
525    name: &str,
526    lines: usize,
527    token: Option<&str>,
528    writer: &mut (dyn std::io::Write + Send),
529) -> Result<()> {
530    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
531    write!(writer, "{prev_output}")?;
532
533    loop {
534        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
535
536        // Check session status
537        let status = fetch_session_status(client, base, name, token).await?;
538        let is_terminal = status == "completed" || status == "dead" || status == "stale";
539
540        // Fetch latest output
541        let new_output = fetch_output(client, base, name, lines, token).await?;
542
543        let diff = diff_output(&prev_output, &new_output);
544        if !diff.is_empty() {
545            write!(writer, "{diff}")?;
546        }
547        prev_output = new_output;
548
549        if is_terminal {
550            break;
551        }
552    }
553    Ok(())
554}
555
556// --- Crontab wrapper ---
557
558#[cfg_attr(coverage, allow(dead_code))]
559const CRONTAB_TAG: &str = "#pulpo:";
560
561/// Read the current crontab. Returns empty string if no crontab exists.
562#[cfg(not(coverage))]
563fn read_crontab() -> Result<String> {
564    let output = std::process::Command::new("crontab").arg("-l").output()?;
565    if output.status.success() {
566        Ok(String::from_utf8_lossy(&output.stdout).to_string())
567    } else {
568        Ok(String::new())
569    }
570}
571
572/// Write the given content as the user's crontab.
573#[cfg(not(coverage))]
574fn write_crontab(content: &str) -> Result<()> {
575    use std::io::Write;
576    let mut child = std::process::Command::new("crontab")
577        .arg("-")
578        .stdin(std::process::Stdio::piped())
579        .spawn()?;
580    child
581        .stdin
582        .as_mut()
583        .unwrap()
584        .write_all(content.as_bytes())?;
585    let status = child.wait()?;
586    if !status.success() {
587        anyhow::bail!("crontab write failed");
588    }
589    Ok(())
590}
591
592/// Build the crontab line for a pulpo schedule.
593#[cfg_attr(coverage, allow(dead_code))]
594fn build_crontab_line(
595    name: &str,
596    cron: &str,
597    workdir: &str,
598    provider: &str,
599    prompt: &str,
600    node: &str,
601) -> String {
602    format!(
603        "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
604    )
605}
606
607/// Install a cron schedule into a crontab string. Returns the updated crontab.
608#[cfg_attr(coverage, allow(dead_code))]
609fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
610    let tag = format!("{CRONTAB_TAG}{name}");
611    if crontab.contains(&tag) {
612        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
613    }
614    let mut result = crontab.to_owned();
615    result.push_str(line);
616    Ok(result)
617}
618
619/// Format pulpo crontab entries for display.
620#[cfg_attr(coverage, allow(dead_code))]
621fn crontab_list(crontab: &str) -> String {
622    let entries: Vec<&str> = crontab
623        .lines()
624        .filter(|l| l.contains(CRONTAB_TAG))
625        .collect();
626    if entries.is_empty() {
627        return "No pulpo schedules.".into();
628    }
629    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
630    for entry in entries {
631        let paused = entry.starts_with('#');
632        let raw = entry.trim_start_matches('#').trim();
633        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
634        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
635        let cron_expr = if parts.len() >= 5 {
636            parts[..5].join(" ")
637        } else {
638            "?".into()
639        };
640        lines.push(format!(
641            "{:<20} {:<15} {}",
642            name,
643            cron_expr,
644            if paused { "yes" } else { "no" }
645        ));
646    }
647    lines.join("\n")
648}
649
650/// Remove a schedule from a crontab string. Returns the updated crontab.
651#[cfg_attr(coverage, allow(dead_code))]
652fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
653    use std::fmt::Write;
654    let tag = format!("{CRONTAB_TAG}{name}");
655    let filtered =
656        crontab
657            .lines()
658            .filter(|l| !l.contains(&tag))
659            .fold(String::new(), |mut acc, l| {
660                writeln!(acc, "{l}").unwrap();
661                acc
662            });
663    if filtered.len() == crontab.len() {
664        anyhow::bail!("schedule \"{name}\" not found");
665    }
666    Ok(filtered)
667}
668
669/// Pause (comment out) a schedule in a crontab string.
670#[cfg_attr(coverage, allow(dead_code))]
671fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
672    use std::fmt::Write;
673    let tag = format!("{CRONTAB_TAG}{name}");
674    let mut found = false;
675    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
676        if l.contains(&tag) && !l.starts_with('#') {
677            found = true;
678            writeln!(acc, "#{l}").unwrap();
679        } else {
680            writeln!(acc, "{l}").unwrap();
681        }
682        acc
683    });
684    if !found {
685        anyhow::bail!("schedule \"{name}\" not found or already paused");
686    }
687    Ok(updated)
688}
689
690/// Resume (uncomment) a schedule in a crontab string.
691#[cfg_attr(coverage, allow(dead_code))]
692fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
693    use std::fmt::Write;
694    let tag = format!("{CRONTAB_TAG}{name}");
695    let mut found = false;
696    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
697        if l.contains(&tag) && l.starts_with('#') {
698            found = true;
699            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
700        } else {
701            writeln!(acc, "{l}").unwrap();
702        }
703        acc
704    });
705    if !found {
706        anyhow::bail!("schedule \"{name}\" not found or not paused");
707    }
708    Ok(updated)
709}
710
711/// Execute a schedule subcommand using the crontab wrapper.
712#[cfg(not(coverage))]
713fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
714    match action {
715        ScheduleAction::Install {
716            name,
717            cron,
718            workdir,
719            provider,
720            prompt,
721        } => {
722            let crontab = read_crontab()?;
723            let joined_prompt = prompt.join(" ");
724            let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
725            let updated = crontab_install(&crontab, name, &line)?;
726            write_crontab(&updated)?;
727            Ok(format!("Installed schedule \"{name}\""))
728        }
729        ScheduleAction::List => {
730            let crontab = read_crontab()?;
731            Ok(crontab_list(&crontab))
732        }
733        ScheduleAction::Remove { name } => {
734            let crontab = read_crontab()?;
735            let updated = crontab_remove(&crontab, name)?;
736            write_crontab(&updated)?;
737            Ok(format!("Removed schedule \"{name}\""))
738        }
739        ScheduleAction::Pause { name } => {
740            let crontab = read_crontab()?;
741            let updated = crontab_pause(&crontab, name)?;
742            write_crontab(&updated)?;
743            Ok(format!("Paused schedule \"{name}\""))
744        }
745        ScheduleAction::Resume { name } => {
746            let crontab = read_crontab()?;
747            let updated = crontab_resume(&crontab, name)?;
748            write_crontab(&updated)?;
749            Ok(format!("Resumed schedule \"{name}\""))
750        }
751    }
752}
753
754/// Stub for coverage builds — crontab is real I/O.
755#[cfg(coverage)]
756fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
757    Ok(String::new())
758}
759
760/// Execute the given CLI command against the specified node.
761#[allow(clippy::too_many_lines)]
762pub async fn execute(cli: &Cli) -> Result<String> {
763    let url = base_url(&cli.node);
764    let client = reqwest::Client::new();
765    let node = &cli.node;
766    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
767
768    match &cli.command {
769        Commands::Attach { name } => {
770            attach_session(name)?;
771            Ok(format!("Detached from session {name}."))
772        }
773        Commands::Input { name, text } => {
774            let input_text = text.as_deref().unwrap_or("\n");
775            let body = serde_json::json!({ "text": input_text });
776            let resp = authed_post(
777                &client,
778                format!("{url}/api/v1/sessions/{name}/input"),
779                token.as_deref(),
780            )
781            .json(&body)
782            .send()
783            .await
784            .map_err(|e| friendly_error(&e, node))?;
785            ok_or_api_error(resp).await?;
786            Ok(format!("Sent input to session {name}."))
787        }
788        Commands::List => {
789            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
790                .send()
791                .await
792                .map_err(|e| friendly_error(&e, node))?;
793            let text = ok_or_api_error(resp).await?;
794            let sessions: Vec<Session> = serde_json::from_str(&text)?;
795            Ok(format_sessions(&sessions))
796        }
797        Commands::Nodes => {
798            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
799                .send()
800                .await
801                .map_err(|e| friendly_error(&e, node))?;
802            let text = ok_or_api_error(resp).await?;
803            let resp: PeersResponse = serde_json::from_str(&text)?;
804            Ok(format_nodes(&resp))
805        }
806        Commands::Spawn {
807            workdir,
808            name,
809            provider,
810            auto,
811            guard,
812            model,
813            system_prompt,
814            allowed_tools,
815            persona,
816            max_turns,
817            max_budget,
818            output_format,
819            prompt,
820        } => {
821            let prompt_text = prompt.join(" ");
822            let mode = if *auto { "autonomous" } else { "interactive" };
823            let mut body = serde_json::json!({
824                "workdir": workdir,
825                "provider": provider,
826                "prompt": prompt_text,
827                "mode": mode,
828                "guard_preset": guard,
829            });
830            if let Some(n) = name {
831                body["name"] = serde_json::json!(n);
832            }
833            if let Some(m) = model {
834                body["model"] = serde_json::json!(m);
835            }
836            if let Some(sp) = system_prompt {
837                body["system_prompt"] = serde_json::json!(sp);
838            }
839            if let Some(tools) = allowed_tools {
840                body["allowed_tools"] = serde_json::json!(tools);
841            }
842            if let Some(p) = persona {
843                body["persona"] = serde_json::json!(p);
844            }
845            if let Some(mt) = max_turns {
846                body["max_turns"] = serde_json::json!(mt);
847            }
848            if let Some(mb) = max_budget {
849                body["max_budget_usd"] = serde_json::json!(mb);
850            }
851            if let Some(of) = output_format {
852                body["output_format"] = serde_json::json!(of);
853            }
854            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
855                .json(&body)
856                .send()
857                .await
858                .map_err(|e| friendly_error(&e, node))?;
859            let text = ok_or_api_error(resp).await?;
860            let session: Session = serde_json::from_str(&text)?;
861            Ok(format!(
862                "Created session \"{}\" ({})",
863                session.name, session.id
864            ))
865        }
866        Commands::Kill { name } => {
867            let resp = authed_post(
868                &client,
869                format!("{url}/api/v1/sessions/{name}/kill"),
870                token.as_deref(),
871            )
872            .send()
873            .await
874            .map_err(|e| friendly_error(&e, node))?;
875            ok_or_api_error(resp).await?;
876            Ok(format!("Session {name} killed."))
877        }
878        Commands::Delete { name } => {
879            let resp = authed_delete(
880                &client,
881                format!("{url}/api/v1/sessions/{name}"),
882                token.as_deref(),
883            )
884            .send()
885            .await
886            .map_err(|e| friendly_error(&e, node))?;
887            ok_or_api_error(resp).await?;
888            Ok(format!("Session {name} deleted."))
889        }
890        Commands::Logs {
891            name,
892            lines,
893            follow,
894        } => {
895            if *follow {
896                let mut stdout = std::io::stdout();
897                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
898                    .await
899                    .map_err(|e| {
900                        // Unwrap reqwest errors to friendly messages
901                        match e.downcast::<reqwest::Error>() {
902                            Ok(re) => friendly_error(&re, node),
903                            Err(other) => other,
904                        }
905                    })?;
906                Ok(String::new())
907            } else {
908                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
909                    .await
910                    .map_err(|e| match e.downcast::<reqwest::Error>() {
911                        Ok(re) => friendly_error(&re, node),
912                        Err(other) => other,
913                    })?;
914                Ok(output)
915            }
916        }
917        Commands::Interventions { name } => {
918            let resp = authed_get(
919                &client,
920                format!("{url}/api/v1/sessions/{name}/interventions"),
921                token.as_deref(),
922            )
923            .send()
924            .await
925            .map_err(|e| friendly_error(&e, node))?;
926            let text = ok_or_api_error(resp).await?;
927            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
928            Ok(format_interventions(&events))
929        }
930        Commands::Ui => {
931            let dashboard = base_url(&cli.node);
932            open_browser(&dashboard)?;
933            Ok(format!("Opening {dashboard}"))
934        }
935        Commands::Resume { name } => {
936            let resp = authed_post(
937                &client,
938                format!("{url}/api/v1/sessions/{name}/resume"),
939                token.as_deref(),
940            )
941            .send()
942            .await
943            .map_err(|e| friendly_error(&e, node))?;
944            let text = ok_or_api_error(resp).await?;
945            let session: Session = serde_json::from_str(&text)?;
946            Ok(format!("Resumed session \"{}\"", session.name))
947        }
948        Commands::Schedule { action } => execute_schedule(action, node),
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955
956    #[test]
957    fn test_base_url() {
958        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
959        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
960    }
961
962    #[test]
963    fn test_cli_parse_list() {
964        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
965        assert_eq!(cli.node, "localhost:7433");
966        assert!(matches!(cli.command, Commands::List));
967    }
968
969    #[test]
970    fn test_cli_parse_nodes() {
971        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
972        assert!(matches!(cli.command, Commands::Nodes));
973    }
974
975    #[test]
976    fn test_cli_parse_ui() {
977        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
978        assert!(matches!(cli.command, Commands::Ui));
979    }
980
981    #[test]
982    fn test_cli_parse_ui_custom_node() {
983        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
984        assert!(matches!(cli.command, Commands::Ui));
985        assert_eq!(cli.node, "mac-mini:7433");
986    }
987
988    #[test]
989    fn test_build_open_command() {
990        let cmd = build_open_command("http://localhost:7433");
991        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
992        assert_eq!(args, vec!["http://localhost:7433"]);
993        #[cfg(target_os = "macos")]
994        assert_eq!(cmd.get_program(), "open");
995        #[cfg(target_os = "linux")]
996        assert_eq!(cmd.get_program(), "xdg-open");
997    }
998
999    #[test]
1000    fn test_cli_parse_spawn() {
1001        let cli = Cli::try_parse_from([
1002            "pulpo",
1003            "spawn",
1004            "--workdir",
1005            "/tmp/repo",
1006            "Fix",
1007            "the",
1008            "bug",
1009        ])
1010        .unwrap();
1011        assert!(matches!(
1012            &cli.command,
1013            Commands::Spawn { workdir, provider, auto, guard, prompt, .. }
1014                if workdir == "/tmp/repo" && provider == "claude" && !auto
1015                && guard == "standard" && prompt == &["Fix", "the", "bug"]
1016        ));
1017    }
1018
1019    #[test]
1020    fn test_cli_parse_spawn_with_provider() {
1021        let cli = Cli::try_parse_from([
1022            "pulpo",
1023            "spawn",
1024            "--workdir",
1025            "/tmp",
1026            "--provider",
1027            "codex",
1028            "Do it",
1029        ])
1030        .unwrap();
1031        assert!(matches!(
1032            &cli.command,
1033            Commands::Spawn { provider, .. } if provider == "codex"
1034        ));
1035    }
1036
1037    #[test]
1038    fn test_cli_parse_spawn_auto() {
1039        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1040            .unwrap();
1041        assert!(matches!(
1042            &cli.command,
1043            Commands::Spawn { auto, .. } if *auto
1044        ));
1045    }
1046
1047    #[test]
1048    fn test_cli_parse_spawn_with_guard() {
1049        let cli = Cli::try_parse_from([
1050            "pulpo",
1051            "spawn",
1052            "--workdir",
1053            "/tmp",
1054            "--guard",
1055            "strict",
1056            "Do it",
1057        ])
1058        .unwrap();
1059        assert!(matches!(
1060            &cli.command,
1061            Commands::Spawn { guard, .. } if guard == "strict"
1062        ));
1063    }
1064
1065    #[test]
1066    fn test_cli_parse_spawn_guard_default() {
1067        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1068        assert!(matches!(
1069            &cli.command,
1070            Commands::Spawn { guard, .. } if guard == "standard"
1071        ));
1072    }
1073
1074    #[test]
1075    fn test_cli_parse_spawn_with_name() {
1076        let cli = Cli::try_parse_from([
1077            "pulpo",
1078            "spawn",
1079            "--workdir",
1080            "/tmp/repo",
1081            "--name",
1082            "my-task",
1083            "Fix it",
1084        ])
1085        .unwrap();
1086        assert!(matches!(
1087            &cli.command,
1088            Commands::Spawn { workdir, name, .. }
1089                if workdir == "/tmp/repo" && name.as_deref() == Some("my-task")
1090        ));
1091    }
1092
1093    #[test]
1094    fn test_cli_parse_spawn_without_name() {
1095        let cli =
1096            Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1097        assert!(matches!(
1098            &cli.command,
1099            Commands::Spawn { name, .. } if name.is_none()
1100        ));
1101    }
1102
1103    #[test]
1104    fn test_cli_parse_logs() {
1105        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1106        assert!(matches!(
1107            &cli.command,
1108            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1109        ));
1110    }
1111
1112    #[test]
1113    fn test_cli_parse_logs_with_lines() {
1114        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1115        assert!(matches!(
1116            &cli.command,
1117            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1118        ));
1119    }
1120
1121    #[test]
1122    fn test_cli_parse_logs_follow() {
1123        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1124        assert!(matches!(
1125            &cli.command,
1126            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1127        ));
1128    }
1129
1130    #[test]
1131    fn test_cli_parse_logs_follow_short() {
1132        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1133        assert!(matches!(
1134            &cli.command,
1135            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1136        ));
1137    }
1138
1139    #[test]
1140    fn test_cli_parse_kill() {
1141        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1142        assert!(matches!(
1143            &cli.command,
1144            Commands::Kill { name } if name == "my-session"
1145        ));
1146    }
1147
1148    #[test]
1149    fn test_cli_parse_delete() {
1150        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1151        assert!(matches!(
1152            &cli.command,
1153            Commands::Delete { name } if name == "my-session"
1154        ));
1155    }
1156
1157    #[test]
1158    fn test_cli_parse_resume() {
1159        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1160        assert!(matches!(
1161            &cli.command,
1162            Commands::Resume { name } if name == "my-session"
1163        ));
1164    }
1165
1166    #[test]
1167    fn test_cli_parse_input() {
1168        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1169        assert!(matches!(
1170            &cli.command,
1171            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1172        ));
1173    }
1174
1175    #[test]
1176    fn test_cli_parse_input_no_text() {
1177        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1178        assert!(matches!(
1179            &cli.command,
1180            Commands::Input { name, text } if name == "my-session" && text.is_none()
1181        ));
1182    }
1183
1184    #[test]
1185    fn test_cli_parse_input_alias() {
1186        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1187        assert!(matches!(
1188            &cli.command,
1189            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1190        ));
1191    }
1192
1193    #[test]
1194    fn test_cli_parse_custom_node() {
1195        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1196        assert_eq!(cli.node, "win-pc:8080");
1197    }
1198
1199    #[test]
1200    fn test_cli_version() {
1201        let result = Cli::try_parse_from(["pulpo", "--version"]);
1202        // clap exits with an error (kind DisplayVersion) when --version is used
1203        let err = result.unwrap_err();
1204        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1205    }
1206
1207    #[test]
1208    fn test_cli_parse_no_subcommand_fails() {
1209        let result = Cli::try_parse_from(["pulpo"]);
1210        assert!(result.is_err());
1211    }
1212
1213    #[test]
1214    fn test_cli_debug() {
1215        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1216        let debug = format!("{cli:?}");
1217        assert!(debug.contains("List"));
1218    }
1219
1220    #[test]
1221    fn test_commands_debug() {
1222        let cmd = Commands::List;
1223        assert_eq!(format!("{cmd:?}"), "List");
1224    }
1225
1226    /// A valid Session JSON for test responses.
1227    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1228
1229    /// Start a lightweight test HTTP server and return its address.
1230    async fn start_test_server() -> String {
1231        use axum::http::StatusCode;
1232        use axum::{
1233            Json, Router,
1234            routing::{delete, get, post},
1235        };
1236
1237        let session_json = TEST_SESSION_JSON;
1238
1239        let app = Router::new()
1240            .route(
1241                "/api/v1/sessions",
1242                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1243                    (StatusCode::CREATED, session_json.to_owned())
1244                }),
1245            )
1246            .route(
1247                "/api/v1/sessions/{id}",
1248                delete(|| async { StatusCode::NO_CONTENT }),
1249            )
1250            .route(
1251                "/api/v1/sessions/{id}/kill",
1252                post(|| async { StatusCode::NO_CONTENT }),
1253            )
1254            .route(
1255                "/api/v1/sessions/{id}/output",
1256                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1257            )
1258            .route(
1259                "/api/v1/peers",
1260                get(|| async {
1261                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1262                }),
1263            )
1264            .route(
1265                "/api/v1/sessions/{id}/resume",
1266                axum::routing::post(move || async move { session_json.to_owned() }),
1267            )
1268            .route(
1269                "/api/v1/sessions/{id}/interventions",
1270                get(|| async { "[]".to_owned() }),
1271            )
1272            .route(
1273                "/api/v1/sessions/{id}/input",
1274                post(|| async { StatusCode::NO_CONTENT }),
1275            );
1276
1277        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1278        let addr = listener.local_addr().unwrap();
1279        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1280        format!("127.0.0.1:{}", addr.port())
1281    }
1282
1283    #[tokio::test]
1284    async fn test_execute_list_success() {
1285        let node = start_test_server().await;
1286        let cli = Cli {
1287            node,
1288            token: None,
1289            command: Commands::List,
1290        };
1291        let result = execute(&cli).await.unwrap();
1292        assert_eq!(result, "No sessions.");
1293    }
1294
1295    #[tokio::test]
1296    async fn test_execute_nodes_success() {
1297        let node = start_test_server().await;
1298        let cli = Cli {
1299            node,
1300            token: None,
1301            command: Commands::Nodes,
1302        };
1303        let result = execute(&cli).await.unwrap();
1304        assert!(result.contains("test"));
1305        assert!(result.contains("(local)"));
1306        assert!(result.contains("NAME"));
1307    }
1308
1309    #[tokio::test]
1310    async fn test_execute_spawn_success() {
1311        let node = start_test_server().await;
1312        let cli = Cli {
1313            node,
1314            token: None,
1315            command: Commands::Spawn {
1316                workdir: "/tmp/repo".into(),
1317                name: None,
1318                provider: "claude".into(),
1319                auto: false,
1320                guard: "standard".into(),
1321                model: None,
1322                system_prompt: None,
1323                allowed_tools: None,
1324                persona: None,
1325                max_turns: None,
1326                max_budget: None,
1327                output_format: None,
1328                prompt: vec!["Fix".into(), "bug".into()],
1329            },
1330        };
1331        let result = execute(&cli).await.unwrap();
1332        assert!(result.contains("Created session"));
1333        assert!(result.contains("repo"));
1334    }
1335
1336    #[tokio::test]
1337    async fn test_execute_spawn_with_all_new_flags() {
1338        let node = start_test_server().await;
1339        let cli = Cli {
1340            node,
1341            token: None,
1342            command: Commands::Spawn {
1343                workdir: "/tmp/repo".into(),
1344                name: None,
1345                provider: "claude".into(),
1346                auto: false,
1347                guard: "standard".into(),
1348                model: Some("opus".into()),
1349                system_prompt: Some("Be helpful".into()),
1350                allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1351                persona: Some("coder".into()),
1352                max_turns: Some(5),
1353                max_budget: Some(2.5),
1354                output_format: Some("json".into()),
1355                prompt: vec!["Fix".into(), "bug".into()],
1356            },
1357        };
1358        let result = execute(&cli).await.unwrap();
1359        assert!(result.contains("Created session"));
1360    }
1361
1362    #[tokio::test]
1363    async fn test_execute_spawn_auto_mode() {
1364        let node = start_test_server().await;
1365        let cli = Cli {
1366            node,
1367            token: None,
1368            command: Commands::Spawn {
1369                workdir: "/tmp/repo".into(),
1370                name: None,
1371                provider: "claude".into(),
1372                auto: true,
1373                guard: "standard".into(),
1374                model: None,
1375                system_prompt: None,
1376                allowed_tools: None,
1377                persona: None,
1378                max_turns: None,
1379                max_budget: None,
1380                output_format: None,
1381                prompt: vec!["Do it".into()],
1382            },
1383        };
1384        let result = execute(&cli).await.unwrap();
1385        assert!(result.contains("Created session"));
1386    }
1387
1388    #[tokio::test]
1389    async fn test_execute_spawn_with_name() {
1390        let node = start_test_server().await;
1391        let cli = Cli {
1392            node,
1393            token: None,
1394            command: Commands::Spawn {
1395                workdir: "/tmp/repo".into(),
1396                name: Some("my-task".into()),
1397                provider: "claude".into(),
1398                auto: false,
1399                guard: "standard".into(),
1400                model: None,
1401                system_prompt: None,
1402                allowed_tools: None,
1403                persona: None,
1404                max_turns: None,
1405                max_budget: None,
1406                output_format: None,
1407                prompt: vec!["Fix".into(), "bug".into()],
1408            },
1409        };
1410        let result = execute(&cli).await.unwrap();
1411        assert!(result.contains("Created session"));
1412    }
1413
1414    #[tokio::test]
1415    async fn test_execute_kill_success() {
1416        let node = start_test_server().await;
1417        let cli = Cli {
1418            node,
1419            token: None,
1420            command: Commands::Kill {
1421                name: "test-session".into(),
1422            },
1423        };
1424        let result = execute(&cli).await.unwrap();
1425        assert!(result.contains("killed"));
1426    }
1427
1428    #[tokio::test]
1429    async fn test_execute_delete_success() {
1430        let node = start_test_server().await;
1431        let cli = Cli {
1432            node,
1433            token: None,
1434            command: Commands::Delete {
1435                name: "test-session".into(),
1436            },
1437        };
1438        let result = execute(&cli).await.unwrap();
1439        assert!(result.contains("deleted"));
1440    }
1441
1442    #[tokio::test]
1443    async fn test_execute_logs_success() {
1444        let node = start_test_server().await;
1445        let cli = Cli {
1446            node,
1447            token: None,
1448            command: Commands::Logs {
1449                name: "test-session".into(),
1450                lines: 50,
1451                follow: false,
1452            },
1453        };
1454        let result = execute(&cli).await.unwrap();
1455        assert!(result.contains("test output"));
1456    }
1457
1458    #[tokio::test]
1459    async fn test_execute_list_connection_refused() {
1460        let cli = Cli {
1461            node: "localhost:1".into(),
1462            token: None,
1463            command: Commands::List,
1464        };
1465        let result = execute(&cli).await;
1466        let err = result.unwrap_err().to_string();
1467        assert!(
1468            err.contains("Could not connect to pulpod"),
1469            "Expected friendly error, got: {err}"
1470        );
1471        assert!(err.contains("localhost:1"));
1472    }
1473
1474    #[tokio::test]
1475    async fn test_execute_nodes_connection_refused() {
1476        let cli = Cli {
1477            node: "localhost:1".into(),
1478            token: None,
1479            command: Commands::Nodes,
1480        };
1481        let result = execute(&cli).await;
1482        let err = result.unwrap_err().to_string();
1483        assert!(err.contains("Could not connect to pulpod"));
1484    }
1485
1486    #[tokio::test]
1487    async fn test_execute_kill_error_response() {
1488        use axum::{Router, http::StatusCode, routing::post};
1489
1490        let app = Router::new().route(
1491            "/api/v1/sessions/{id}/kill",
1492            post(|| async {
1493                (
1494                    StatusCode::NOT_FOUND,
1495                    "{\"error\":\"session not found: test-session\"}",
1496                )
1497            }),
1498        );
1499        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1500        let addr = listener.local_addr().unwrap();
1501        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1502        let node = format!("127.0.0.1:{}", addr.port());
1503
1504        let cli = Cli {
1505            node,
1506            token: None,
1507            command: Commands::Kill {
1508                name: "test-session".into(),
1509            },
1510        };
1511        let err = execute(&cli).await.unwrap_err();
1512        assert_eq!(err.to_string(), "session not found: test-session");
1513    }
1514
1515    #[tokio::test]
1516    async fn test_execute_delete_error_response() {
1517        use axum::{Router, http::StatusCode, routing::delete};
1518
1519        let app = Router::new().route(
1520            "/api/v1/sessions/{id}",
1521            delete(|| async {
1522                (
1523                    StatusCode::CONFLICT,
1524                    "{\"error\":\"cannot delete session in 'running' state\"}",
1525                )
1526            }),
1527        );
1528        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1529        let addr = listener.local_addr().unwrap();
1530        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1531        let node = format!("127.0.0.1:{}", addr.port());
1532
1533        let cli = Cli {
1534            node,
1535            token: None,
1536            command: Commands::Delete {
1537                name: "test-session".into(),
1538            },
1539        };
1540        let err = execute(&cli).await.unwrap_err();
1541        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1542    }
1543
1544    #[tokio::test]
1545    async fn test_execute_logs_error_response() {
1546        use axum::{Router, http::StatusCode, routing::get};
1547
1548        let app = Router::new().route(
1549            "/api/v1/sessions/{id}/output",
1550            get(|| async {
1551                (
1552                    StatusCode::NOT_FOUND,
1553                    "{\"error\":\"session not found: ghost\"}",
1554                )
1555            }),
1556        );
1557        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1558        let addr = listener.local_addr().unwrap();
1559        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1560        let node = format!("127.0.0.1:{}", addr.port());
1561
1562        let cli = Cli {
1563            node,
1564            token: None,
1565            command: Commands::Logs {
1566                name: "ghost".into(),
1567                lines: 50,
1568                follow: false,
1569            },
1570        };
1571        let err = execute(&cli).await.unwrap_err();
1572        assert_eq!(err.to_string(), "session not found: ghost");
1573    }
1574
1575    #[tokio::test]
1576    async fn test_execute_resume_error_response() {
1577        use axum::{Router, http::StatusCode, routing::post};
1578
1579        let app = Router::new().route(
1580            "/api/v1/sessions/{id}/resume",
1581            post(|| async {
1582                (
1583                    StatusCode::BAD_REQUEST,
1584                    "{\"error\":\"session is not stale (status: running)\"}",
1585                )
1586            }),
1587        );
1588        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1589        let addr = listener.local_addr().unwrap();
1590        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1591        let node = format!("127.0.0.1:{}", addr.port());
1592
1593        let cli = Cli {
1594            node,
1595            token: None,
1596            command: Commands::Resume {
1597                name: "test-session".into(),
1598            },
1599        };
1600        let err = execute(&cli).await.unwrap_err();
1601        assert_eq!(err.to_string(), "session is not stale (status: running)");
1602    }
1603
1604    #[tokio::test]
1605    async fn test_execute_spawn_error_response() {
1606        use axum::{Router, http::StatusCode, routing::post};
1607
1608        let app = Router::new().route(
1609            "/api/v1/sessions",
1610            post(|| async {
1611                (
1612                    StatusCode::INTERNAL_SERVER_ERROR,
1613                    "{\"error\":\"failed to spawn session\"}",
1614                )
1615            }),
1616        );
1617        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1618        let addr = listener.local_addr().unwrap();
1619        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1620        let node = format!("127.0.0.1:{}", addr.port());
1621
1622        let cli = Cli {
1623            node,
1624            token: None,
1625            command: Commands::Spawn {
1626                workdir: "/tmp/repo".into(),
1627                name: None,
1628                provider: "claude".into(),
1629                auto: false,
1630                guard: "standard".into(),
1631                model: None,
1632                system_prompt: None,
1633                allowed_tools: None,
1634                persona: None,
1635                max_turns: None,
1636                max_budget: None,
1637                output_format: None,
1638                prompt: vec!["test".into()],
1639            },
1640        };
1641        let err = execute(&cli).await.unwrap_err();
1642        assert_eq!(err.to_string(), "failed to spawn session");
1643    }
1644
1645    #[tokio::test]
1646    async fn test_execute_interventions_error_response() {
1647        use axum::{Router, http::StatusCode, routing::get};
1648
1649        let app = Router::new().route(
1650            "/api/v1/sessions/{id}/interventions",
1651            get(|| async {
1652                (
1653                    StatusCode::NOT_FOUND,
1654                    "{\"error\":\"session not found: ghost\"}",
1655                )
1656            }),
1657        );
1658        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1659        let addr = listener.local_addr().unwrap();
1660        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1661        let node = format!("127.0.0.1:{}", addr.port());
1662
1663        let cli = Cli {
1664            node,
1665            token: None,
1666            command: Commands::Interventions {
1667                name: "ghost".into(),
1668            },
1669        };
1670        let err = execute(&cli).await.unwrap_err();
1671        assert_eq!(err.to_string(), "session not found: ghost");
1672    }
1673
1674    #[tokio::test]
1675    async fn test_execute_resume_success() {
1676        let node = start_test_server().await;
1677        let cli = Cli {
1678            node,
1679            token: None,
1680            command: Commands::Resume {
1681                name: "test-session".into(),
1682            },
1683        };
1684        let result = execute(&cli).await.unwrap();
1685        assert!(result.contains("Resumed session"));
1686        assert!(result.contains("repo"));
1687    }
1688
1689    #[tokio::test]
1690    async fn test_execute_input_success() {
1691        let node = start_test_server().await;
1692        let cli = Cli {
1693            node,
1694            token: None,
1695            command: Commands::Input {
1696                name: "test-session".into(),
1697                text: Some("yes".into()),
1698            },
1699        };
1700        let result = execute(&cli).await.unwrap();
1701        assert!(result.contains("Sent input to session test-session"));
1702    }
1703
1704    #[tokio::test]
1705    async fn test_execute_input_no_text() {
1706        let node = start_test_server().await;
1707        let cli = Cli {
1708            node,
1709            token: None,
1710            command: Commands::Input {
1711                name: "test-session".into(),
1712                text: None,
1713            },
1714        };
1715        let result = execute(&cli).await.unwrap();
1716        assert!(result.contains("Sent input to session test-session"));
1717    }
1718
1719    #[tokio::test]
1720    async fn test_execute_input_connection_refused() {
1721        let cli = Cli {
1722            node: "localhost:1".into(),
1723            token: None,
1724            command: Commands::Input {
1725                name: "test".into(),
1726                text: Some("y".into()),
1727            },
1728        };
1729        let result = execute(&cli).await;
1730        let err = result.unwrap_err().to_string();
1731        assert!(err.contains("Could not connect to pulpod"));
1732    }
1733
1734    #[tokio::test]
1735    async fn test_execute_input_error_response() {
1736        use axum::{Router, http::StatusCode, routing::post};
1737
1738        let app = Router::new().route(
1739            "/api/v1/sessions/{id}/input",
1740            post(|| async {
1741                (
1742                    StatusCode::NOT_FOUND,
1743                    "{\"error\":\"session not found: ghost\"}",
1744                )
1745            }),
1746        );
1747        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1748        let addr = listener.local_addr().unwrap();
1749        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1750        let node = format!("127.0.0.1:{}", addr.port());
1751
1752        let cli = Cli {
1753            node,
1754            token: None,
1755            command: Commands::Input {
1756                name: "ghost".into(),
1757                text: Some("y".into()),
1758            },
1759        };
1760        let err = execute(&cli).await.unwrap_err();
1761        assert_eq!(err.to_string(), "session not found: ghost");
1762    }
1763
1764    #[tokio::test]
1765    async fn test_execute_ui() {
1766        let cli = Cli {
1767            node: "localhost:7433".into(),
1768            token: None,
1769            command: Commands::Ui,
1770        };
1771        let result = execute(&cli).await.unwrap();
1772        assert!(result.contains("Opening"));
1773        assert!(result.contains("http://localhost:7433"));
1774    }
1775
1776    #[tokio::test]
1777    async fn test_execute_ui_custom_node() {
1778        let cli = Cli {
1779            node: "mac-mini:7433".into(),
1780            token: None,
1781            command: Commands::Ui,
1782        };
1783        let result = execute(&cli).await.unwrap();
1784        assert!(result.contains("http://mac-mini:7433"));
1785    }
1786
1787    #[test]
1788    fn test_format_sessions_empty() {
1789        assert_eq!(format_sessions(&[]), "No sessions.");
1790    }
1791
1792    #[test]
1793    fn test_format_sessions_with_data() {
1794        use chrono::Utc;
1795        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1796        use uuid::Uuid;
1797
1798        let sessions = vec![Session {
1799            id: Uuid::nil(),
1800            name: "my-api".into(),
1801            workdir: "/tmp/repo".into(),
1802            provider: Provider::Claude,
1803            prompt: "Fix the bug".into(),
1804            status: SessionStatus::Running,
1805            mode: SessionMode::Interactive,
1806            conversation_id: None,
1807            exit_code: None,
1808            backend_session_id: None,
1809            output_snapshot: None,
1810            guard_config: None,
1811            model: None,
1812            allowed_tools: None,
1813            system_prompt: None,
1814            metadata: None,
1815            persona: None,
1816            max_turns: None,
1817            max_budget_usd: None,
1818            output_format: None,
1819            intervention_reason: None,
1820            intervention_at: None,
1821            last_output_at: None,
1822            idle_since: None,
1823            waiting_for_input: false,
1824            created_at: Utc::now(),
1825            updated_at: Utc::now(),
1826        }];
1827        let output = format_sessions(&sessions);
1828        assert!(output.contains("NAME"));
1829        assert!(output.contains("my-api"));
1830        assert!(output.contains("running"));
1831        assert!(output.contains("claude"));
1832        assert!(output.contains("Fix the bug"));
1833    }
1834
1835    #[test]
1836    fn test_format_sessions_long_prompt_truncated() {
1837        use chrono::Utc;
1838        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1839        use uuid::Uuid;
1840
1841        let sessions = vec![Session {
1842            id: Uuid::nil(),
1843            name: "test".into(),
1844            workdir: "/tmp".into(),
1845            provider: Provider::Codex,
1846            prompt: "A very long prompt that exceeds forty characters in total length".into(),
1847            status: SessionStatus::Completed,
1848            mode: SessionMode::Autonomous,
1849            conversation_id: None,
1850            exit_code: None,
1851            backend_session_id: None,
1852            output_snapshot: None,
1853            guard_config: None,
1854            model: None,
1855            allowed_tools: None,
1856            system_prompt: None,
1857            metadata: None,
1858            persona: None,
1859            max_turns: None,
1860            max_budget_usd: None,
1861            output_format: None,
1862            intervention_reason: None,
1863            intervention_at: None,
1864            last_output_at: None,
1865            idle_since: None,
1866            waiting_for_input: false,
1867            created_at: Utc::now(),
1868            updated_at: Utc::now(),
1869        }];
1870        let output = format_sessions(&sessions);
1871        assert!(output.contains("..."));
1872    }
1873
1874    #[test]
1875    fn test_format_sessions_waiting_for_input() {
1876        use chrono::Utc;
1877        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1878        use uuid::Uuid;
1879
1880        let sessions = vec![Session {
1881            id: Uuid::nil(),
1882            name: "blocked".into(),
1883            workdir: "/tmp".into(),
1884            provider: Provider::Claude,
1885            prompt: "Fix bug".into(),
1886            status: SessionStatus::Running,
1887            mode: SessionMode::Interactive,
1888            conversation_id: None,
1889            exit_code: None,
1890            backend_session_id: None,
1891            output_snapshot: None,
1892            guard_config: None,
1893            model: None,
1894            allowed_tools: None,
1895            system_prompt: None,
1896            metadata: None,
1897            persona: None,
1898            max_turns: None,
1899            max_budget_usd: None,
1900            output_format: None,
1901            intervention_reason: None,
1902            intervention_at: None,
1903            last_output_at: None,
1904            idle_since: None,
1905            waiting_for_input: true,
1906            created_at: Utc::now(),
1907            updated_at: Utc::now(),
1908        }];
1909        let output = format_sessions(&sessions);
1910        assert!(output.contains("waiting"));
1911        assert!(!output.contains("running"));
1912    }
1913
1914    #[test]
1915    fn test_format_nodes() {
1916        use pulpo_common::node::NodeInfo;
1917        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1918
1919        let resp = PeersResponse {
1920            local: NodeInfo {
1921                name: "mac-mini".into(),
1922                hostname: "h".into(),
1923                os: "macos".into(),
1924                arch: "arm64".into(),
1925                cpus: 8,
1926                memory_mb: 16384,
1927                gpu: None,
1928            },
1929            peers: vec![PeerInfo {
1930                name: "win-pc".into(),
1931                address: "win-pc:7433".into(),
1932                status: PeerStatus::Online,
1933                node_info: None,
1934                session_count: Some(3),
1935                source: PeerSource::Configured,
1936            }],
1937        };
1938        let output = format_nodes(&resp);
1939        assert!(output.contains("mac-mini"));
1940        assert!(output.contains("(local)"));
1941        assert!(output.contains("win-pc"));
1942        assert!(output.contains('3'));
1943    }
1944
1945    #[test]
1946    fn test_format_nodes_no_session_count() {
1947        use pulpo_common::node::NodeInfo;
1948        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1949
1950        let resp = PeersResponse {
1951            local: NodeInfo {
1952                name: "local".into(),
1953                hostname: "h".into(),
1954                os: "linux".into(),
1955                arch: "x86_64".into(),
1956                cpus: 4,
1957                memory_mb: 8192,
1958                gpu: None,
1959            },
1960            peers: vec![PeerInfo {
1961                name: "peer".into(),
1962                address: "peer:7433".into(),
1963                status: PeerStatus::Offline,
1964                node_info: None,
1965                session_count: None,
1966                source: PeerSource::Configured,
1967            }],
1968        };
1969        let output = format_nodes(&resp);
1970        assert!(output.contains("offline"));
1971        // No session count → shows "-"
1972        let lines: Vec<&str> = output.lines().collect();
1973        assert!(lines[2].contains('-'));
1974    }
1975
1976    #[tokio::test]
1977    async fn test_execute_resume_connection_refused() {
1978        let cli = Cli {
1979            node: "localhost:1".into(),
1980            token: None,
1981            command: Commands::Resume {
1982                name: "test".into(),
1983            },
1984        };
1985        let result = execute(&cli).await;
1986        let err = result.unwrap_err().to_string();
1987        assert!(err.contains("Could not connect to pulpod"));
1988    }
1989
1990    #[tokio::test]
1991    async fn test_execute_spawn_connection_refused() {
1992        let cli = Cli {
1993            node: "localhost:1".into(),
1994            token: None,
1995            command: Commands::Spawn {
1996                workdir: "/tmp".into(),
1997                name: None,
1998                provider: "claude".into(),
1999                auto: false,
2000                guard: "standard".into(),
2001                model: None,
2002                system_prompt: None,
2003                allowed_tools: None,
2004                persona: None,
2005                max_turns: None,
2006                max_budget: None,
2007                output_format: None,
2008                prompt: vec!["test".into()],
2009            },
2010        };
2011        let result = execute(&cli).await;
2012        let err = result.unwrap_err().to_string();
2013        assert!(err.contains("Could not connect to pulpod"));
2014    }
2015
2016    #[tokio::test]
2017    async fn test_execute_kill_connection_refused() {
2018        let cli = Cli {
2019            node: "localhost:1".into(),
2020            token: None,
2021            command: Commands::Kill {
2022                name: "test".into(),
2023            },
2024        };
2025        let result = execute(&cli).await;
2026        let err = result.unwrap_err().to_string();
2027        assert!(err.contains("Could not connect to pulpod"));
2028    }
2029
2030    #[tokio::test]
2031    async fn test_execute_delete_connection_refused() {
2032        let cli = Cli {
2033            node: "localhost:1".into(),
2034            token: None,
2035            command: Commands::Delete {
2036                name: "test".into(),
2037            },
2038        };
2039        let result = execute(&cli).await;
2040        let err = result.unwrap_err().to_string();
2041        assert!(err.contains("Could not connect to pulpod"));
2042    }
2043
2044    #[tokio::test]
2045    async fn test_execute_logs_connection_refused() {
2046        let cli = Cli {
2047            node: "localhost:1".into(),
2048            token: None,
2049            command: Commands::Logs {
2050                name: "test".into(),
2051                lines: 50,
2052                follow: false,
2053            },
2054        };
2055        let result = execute(&cli).await;
2056        let err = result.unwrap_err().to_string();
2057        assert!(err.contains("Could not connect to pulpod"));
2058    }
2059
2060    #[tokio::test]
2061    async fn test_friendly_error_connect() {
2062        // Make a request to a closed port to get a connect error
2063        let err = reqwest::Client::new()
2064            .get("http://127.0.0.1:1")
2065            .send()
2066            .await
2067            .unwrap_err();
2068        let friendly = friendly_error(&err, "test-node:1");
2069        let msg = friendly.to_string();
2070        assert!(
2071            msg.contains("Could not connect"),
2072            "Expected connect message, got: {msg}"
2073        );
2074    }
2075
2076    #[tokio::test]
2077    async fn test_friendly_error_other() {
2078        // A request to an invalid URL creates a builder error, not a connect error
2079        let err = reqwest::Client::new()
2080            .get("http://[::invalid::url")
2081            .send()
2082            .await
2083            .unwrap_err();
2084        let friendly = friendly_error(&err, "bad-host");
2085        let msg = friendly.to_string();
2086        assert!(
2087            msg.contains("Network error"),
2088            "Expected network error message, got: {msg}"
2089        );
2090        assert!(msg.contains("bad-host"));
2091    }
2092
2093    // -- Auth helper tests --
2094
2095    #[test]
2096    fn test_is_localhost_variants() {
2097        assert!(is_localhost("localhost:7433"));
2098        assert!(is_localhost("127.0.0.1:7433"));
2099        assert!(is_localhost("[::1]:7433"));
2100        assert!(is_localhost("::1"));
2101        assert!(is_localhost("localhost"));
2102        assert!(!is_localhost("mac-mini:7433"));
2103        assert!(!is_localhost("192.168.1.100:7433"));
2104    }
2105
2106    #[test]
2107    fn test_authed_get_with_token() {
2108        let client = reqwest::Client::new();
2109        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2110            .build()
2111            .unwrap();
2112        let auth = req
2113            .headers()
2114            .get("authorization")
2115            .unwrap()
2116            .to_str()
2117            .unwrap();
2118        assert_eq!(auth, "Bearer tok");
2119    }
2120
2121    #[test]
2122    fn test_authed_get_without_token() {
2123        let client = reqwest::Client::new();
2124        let req = authed_get(&client, "http://h:1/api".into(), None)
2125            .build()
2126            .unwrap();
2127        assert!(req.headers().get("authorization").is_none());
2128    }
2129
2130    #[test]
2131    fn test_authed_post_with_token() {
2132        let client = reqwest::Client::new();
2133        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2134            .build()
2135            .unwrap();
2136        let auth = req
2137            .headers()
2138            .get("authorization")
2139            .unwrap()
2140            .to_str()
2141            .unwrap();
2142        assert_eq!(auth, "Bearer secret");
2143    }
2144
2145    #[test]
2146    fn test_authed_post_without_token() {
2147        let client = reqwest::Client::new();
2148        let req = authed_post(&client, "http://h:1/api".into(), None)
2149            .build()
2150            .unwrap();
2151        assert!(req.headers().get("authorization").is_none());
2152    }
2153
2154    #[test]
2155    fn test_authed_delete_with_token() {
2156        let client = reqwest::Client::new();
2157        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2158            .build()
2159            .unwrap();
2160        let auth = req
2161            .headers()
2162            .get("authorization")
2163            .unwrap()
2164            .to_str()
2165            .unwrap();
2166        assert_eq!(auth, "Bearer del-tok");
2167    }
2168
2169    #[test]
2170    fn test_authed_delete_without_token() {
2171        let client = reqwest::Client::new();
2172        let req = authed_delete(&client, "http://h:1/api".into(), None)
2173            .build()
2174            .unwrap();
2175        assert!(req.headers().get("authorization").is_none());
2176    }
2177
2178    #[tokio::test]
2179    async fn test_resolve_token_explicit() {
2180        let client = reqwest::Client::new();
2181        let token =
2182            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2183        assert_eq!(token, Some("my-tok".into()));
2184    }
2185
2186    #[tokio::test]
2187    async fn test_resolve_token_remote_no_explicit() {
2188        let client = reqwest::Client::new();
2189        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2190        assert_eq!(token, None);
2191    }
2192
2193    #[tokio::test]
2194    async fn test_resolve_token_localhost_auto_discover() {
2195        use axum::{Json, Router, routing::get};
2196
2197        let app = Router::new().route(
2198            "/api/v1/auth/token",
2199            get(|| async {
2200                Json(AuthTokenResponse {
2201                    token: "discovered".into(),
2202                })
2203            }),
2204        );
2205        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2206        let addr = listener.local_addr().unwrap();
2207        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2208
2209        let node = format!("localhost:{}", addr.port());
2210        let base = base_url(&node);
2211        let client = reqwest::Client::new();
2212        let token = resolve_token(&client, &base, &node, None).await;
2213        assert_eq!(token, Some("discovered".into()));
2214    }
2215
2216    #[tokio::test]
2217    async fn test_discover_token_empty_returns_none() {
2218        use axum::{Json, Router, routing::get};
2219
2220        let app = Router::new().route(
2221            "/api/v1/auth/token",
2222            get(|| async {
2223                Json(AuthTokenResponse {
2224                    token: String::new(),
2225                })
2226            }),
2227        );
2228        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2229        let addr = listener.local_addr().unwrap();
2230        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2231
2232        let base = format!("http://127.0.0.1:{}", addr.port());
2233        let client = reqwest::Client::new();
2234        assert_eq!(discover_token(&client, &base).await, None);
2235    }
2236
2237    #[tokio::test]
2238    async fn test_discover_token_unreachable_returns_none() {
2239        let client = reqwest::Client::new();
2240        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2241    }
2242
2243    #[test]
2244    fn test_cli_parse_with_token() {
2245        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2246        assert_eq!(cli.token, Some("my-secret".into()));
2247    }
2248
2249    #[test]
2250    fn test_cli_parse_without_token() {
2251        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2252        assert_eq!(cli.token, None);
2253    }
2254
2255    #[tokio::test]
2256    async fn test_execute_with_explicit_token_sends_header() {
2257        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2258
2259        let app = Router::new().route(
2260            "/api/v1/sessions",
2261            get(|req: Request| async move {
2262                let auth = req
2263                    .headers()
2264                    .get("authorization")
2265                    .and_then(|v| v.to_str().ok())
2266                    .unwrap_or("");
2267                assert_eq!(auth, "Bearer test-token");
2268                (StatusCode::OK, "[]".to_owned())
2269            }),
2270        );
2271        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2272        let addr = listener.local_addr().unwrap();
2273        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2274        let node = format!("127.0.0.1:{}", addr.port());
2275
2276        let cli = Cli {
2277            node,
2278            token: Some("test-token".into()),
2279            command: Commands::List,
2280        };
2281        let result = execute(&cli).await.unwrap();
2282        assert_eq!(result, "No sessions.");
2283    }
2284
2285    // -- Interventions tests --
2286
2287    #[test]
2288    fn test_cli_parse_interventions() {
2289        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2290        assert!(matches!(
2291            &cli.command,
2292            Commands::Interventions { name } if name == "my-session"
2293        ));
2294    }
2295
2296    #[test]
2297    fn test_format_interventions_empty() {
2298        assert_eq!(format_interventions(&[]), "No intervention events.");
2299    }
2300
2301    #[test]
2302    fn test_format_interventions_with_data() {
2303        let events = vec![
2304            InterventionEventResponse {
2305                id: 1,
2306                session_id: "sess-1".into(),
2307                reason: "Memory exceeded threshold".into(),
2308                created_at: "2026-01-01T00:00:00Z".into(),
2309            },
2310            InterventionEventResponse {
2311                id: 2,
2312                session_id: "sess-1".into(),
2313                reason: "Idle for 10 minutes".into(),
2314                created_at: "2026-01-02T00:00:00Z".into(),
2315            },
2316        ];
2317        let output = format_interventions(&events);
2318        assert!(output.contains("ID"));
2319        assert!(output.contains("TIMESTAMP"));
2320        assert!(output.contains("REASON"));
2321        assert!(output.contains("Memory exceeded threshold"));
2322        assert!(output.contains("Idle for 10 minutes"));
2323        assert!(output.contains("2026-01-01T00:00:00Z"));
2324    }
2325
2326    #[tokio::test]
2327    async fn test_execute_interventions_empty() {
2328        let node = start_test_server().await;
2329        let cli = Cli {
2330            node,
2331            token: None,
2332            command: Commands::Interventions {
2333                name: "my-session".into(),
2334            },
2335        };
2336        let result = execute(&cli).await.unwrap();
2337        assert_eq!(result, "No intervention events.");
2338    }
2339
2340    #[tokio::test]
2341    async fn test_execute_interventions_with_data() {
2342        use axum::{Router, routing::get};
2343
2344        let app = Router::new().route(
2345            "/api/v1/sessions/{id}/interventions",
2346            get(|| async {
2347                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2348                    .to_owned()
2349            }),
2350        );
2351        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2352        let addr = listener.local_addr().unwrap();
2353        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2354        let node = format!("127.0.0.1:{}", addr.port());
2355
2356        let cli = Cli {
2357            node,
2358            token: None,
2359            command: Commands::Interventions {
2360                name: "test".into(),
2361            },
2362        };
2363        let result = execute(&cli).await.unwrap();
2364        assert!(result.contains("OOM"));
2365        assert!(result.contains("2026-01-01T00:00:00Z"));
2366    }
2367
2368    #[tokio::test]
2369    async fn test_execute_interventions_connection_refused() {
2370        let cli = Cli {
2371            node: "localhost:1".into(),
2372            token: None,
2373            command: Commands::Interventions {
2374                name: "test".into(),
2375            },
2376        };
2377        let result = execute(&cli).await;
2378        let err = result.unwrap_err().to_string();
2379        assert!(err.contains("Could not connect to pulpod"));
2380    }
2381
2382    // -- Attach command tests --
2383
2384    #[test]
2385    fn test_build_attach_command() {
2386        let cmd = build_attach_command("my-session");
2387        assert_eq!(cmd.get_program(), "tmux");
2388        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2389        assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2390    }
2391
2392    #[test]
2393    fn test_cli_parse_attach() {
2394        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2395        assert!(matches!(
2396            &cli.command,
2397            Commands::Attach { name } if name == "my-session"
2398        ));
2399    }
2400
2401    #[test]
2402    fn test_cli_parse_attach_alias() {
2403        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2404        assert!(matches!(
2405            &cli.command,
2406            Commands::Attach { name } if name == "my-session"
2407        ));
2408    }
2409
2410    #[tokio::test]
2411    async fn test_execute_attach_success() {
2412        let cli = Cli {
2413            node: "localhost:7433".into(),
2414            token: None,
2415            command: Commands::Attach {
2416                name: "test-session".into(),
2417            },
2418        };
2419        let result = execute(&cli).await.unwrap();
2420        assert!(result.contains("Detached from session test-session"));
2421    }
2422
2423    // -- Alias parse tests --
2424
2425    #[test]
2426    fn test_cli_parse_alias_spawn() {
2427        let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2428        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2429    }
2430
2431    #[test]
2432    fn test_cli_parse_alias_list() {
2433        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2434        assert!(matches!(cli.command, Commands::List));
2435    }
2436
2437    #[test]
2438    fn test_cli_parse_alias_logs() {
2439        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2440        assert!(matches!(
2441            &cli.command,
2442            Commands::Logs { name, .. } if name == "my-session"
2443        ));
2444    }
2445
2446    #[test]
2447    fn test_cli_parse_alias_kill() {
2448        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2449        assert!(matches!(
2450            &cli.command,
2451            Commands::Kill { name } if name == "my-session"
2452        ));
2453    }
2454
2455    #[test]
2456    fn test_cli_parse_alias_delete() {
2457        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2458        assert!(matches!(
2459            &cli.command,
2460            Commands::Delete { name } if name == "my-session"
2461        ));
2462    }
2463
2464    #[test]
2465    fn test_cli_parse_alias_resume() {
2466        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2467        assert!(matches!(
2468            &cli.command,
2469            Commands::Resume { name } if name == "my-session"
2470        ));
2471    }
2472
2473    #[test]
2474    fn test_cli_parse_alias_nodes() {
2475        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2476        assert!(matches!(cli.command, Commands::Nodes));
2477    }
2478
2479    #[test]
2480    fn test_cli_parse_alias_interventions() {
2481        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2482        assert!(matches!(
2483            &cli.command,
2484            Commands::Interventions { name } if name == "my-session"
2485        ));
2486    }
2487
2488    #[test]
2489    fn test_api_error_json() {
2490        let err = api_error("{\"error\":\"session not found: foo\"}");
2491        assert_eq!(err.to_string(), "session not found: foo");
2492    }
2493
2494    #[test]
2495    fn test_api_error_plain_text() {
2496        let err = api_error("plain text error");
2497        assert_eq!(err.to_string(), "plain text error");
2498    }
2499
2500    // -- diff_output tests --
2501
2502    #[test]
2503    fn test_diff_output_empty_prev() {
2504        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2505    }
2506
2507    #[test]
2508    fn test_diff_output_identical() {
2509        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2510    }
2511
2512    #[test]
2513    fn test_diff_output_new_lines_appended() {
2514        let prev = "line1\nline2";
2515        let new = "line1\nline2\nline3\nline4";
2516        assert_eq!(diff_output(prev, new), "line3\nline4");
2517    }
2518
2519    #[test]
2520    fn test_diff_output_scrolled_window() {
2521        // Window of 3 lines: old lines scroll off top, new appear at bottom
2522        let prev = "line1\nline2\nline3";
2523        let new = "line2\nline3\nline4";
2524        assert_eq!(diff_output(prev, new), "line4");
2525    }
2526
2527    #[test]
2528    fn test_diff_output_completely_different() {
2529        let prev = "aaa\nbbb";
2530        let new = "xxx\nyyy";
2531        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2532    }
2533
2534    #[test]
2535    fn test_diff_output_last_line_matches_but_overlap_fails() {
2536        // Last line of prev appears in new but preceding lines don't match
2537        let prev = "aaa\ncommon";
2538        let new = "zzz\ncommon\nnew_line";
2539        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2540        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2541        // Falls through, no verified overlap, so returns everything
2542        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2543    }
2544
2545    #[test]
2546    fn test_diff_output_new_empty() {
2547        assert_eq!(diff_output("line1", ""), "");
2548    }
2549
2550    // -- follow_logs tests --
2551
2552    /// Start a test server that simulates evolving output and session status transitions.
2553    async fn start_follow_test_server() -> String {
2554        use axum::{Router, extract::Path, extract::Query, routing::get};
2555        use std::sync::Arc;
2556        use std::sync::atomic::{AtomicUsize, Ordering};
2557
2558        let call_count = Arc::new(AtomicUsize::new(0));
2559        let output_count = call_count.clone();
2560        let status_count = Arc::new(AtomicUsize::new(0));
2561        let status_count_inner = status_count.clone();
2562
2563        let app = Router::new()
2564            .route(
2565                "/api/v1/sessions/{id}/output",
2566                get(
2567                    move |_path: Path<String>,
2568                          _query: Query<std::collections::HashMap<String, String>>| {
2569                        let count = output_count.clone();
2570                        async move {
2571                            let n = count.fetch_add(1, Ordering::SeqCst);
2572                            let output = match n {
2573                                0 => "line1\nline2".to_owned(),
2574                                1 => "line1\nline2\nline3".to_owned(),
2575                                _ => "line2\nline3\nline4".to_owned(),
2576                            };
2577                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2578                        }
2579                    },
2580                ),
2581            )
2582            .route(
2583                "/api/v1/sessions/{id}",
2584                get(move |_path: Path<String>| {
2585                    let count = status_count_inner.clone();
2586                    async move {
2587                        let n = count.fetch_add(1, Ordering::SeqCst);
2588                        let status = if n < 2 { "running" } else { "completed" };
2589                        format!(
2590                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2591                        )
2592                    }
2593                }),
2594            );
2595
2596        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2597        let addr = listener.local_addr().unwrap();
2598        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2599        format!("http://127.0.0.1:{}", addr.port())
2600    }
2601
2602    #[tokio::test]
2603    async fn test_follow_logs_polls_and_exits_on_completed() {
2604        let base = start_follow_test_server().await;
2605        let client = reqwest::Client::new();
2606        let mut buf = Vec::new();
2607
2608        follow_logs(&client, &base, "test", 100, None, &mut buf)
2609            .await
2610            .unwrap();
2611
2612        let output = String::from_utf8(buf).unwrap();
2613        // Should contain initial output + new lines
2614        assert!(output.contains("line1"));
2615        assert!(output.contains("line2"));
2616        assert!(output.contains("line3"));
2617        assert!(output.contains("line4"));
2618    }
2619
2620    #[tokio::test]
2621    async fn test_execute_logs_follow_success() {
2622        let base = start_follow_test_server().await;
2623        // Extract host:port from http://127.0.0.1:PORT
2624        let node = base.strip_prefix("http://").unwrap().to_owned();
2625
2626        let cli = Cli {
2627            node,
2628            token: None,
2629            command: Commands::Logs {
2630                name: "test".into(),
2631                lines: 100,
2632                follow: true,
2633            },
2634        };
2635        // execute() with follow writes to stdout and returns empty string
2636        let result = execute(&cli).await.unwrap();
2637        assert_eq!(result, "");
2638    }
2639
2640    #[tokio::test]
2641    async fn test_execute_logs_follow_connection_refused() {
2642        let cli = Cli {
2643            node: "localhost:1".into(),
2644            token: None,
2645            command: Commands::Logs {
2646                name: "test".into(),
2647                lines: 50,
2648                follow: true,
2649            },
2650        };
2651        let result = execute(&cli).await;
2652        let err = result.unwrap_err().to_string();
2653        assert!(
2654            err.contains("Could not connect to pulpod"),
2655            "Expected friendly error, got: {err}"
2656        );
2657    }
2658
2659    #[tokio::test]
2660    async fn test_follow_logs_exits_on_dead() {
2661        use axum::{Router, extract::Path, extract::Query, routing::get};
2662
2663        let app = Router::new()
2664            .route(
2665                "/api/v1/sessions/{id}/output",
2666                get(
2667                    |_path: Path<String>,
2668                     _query: Query<std::collections::HashMap<String, String>>| async {
2669                        r#"{"output":"some output"}"#.to_owned()
2670                    },
2671                ),
2672            )
2673            .route(
2674                "/api/v1/sessions/{id}",
2675                get(|_path: Path<String>| async {
2676                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2677                }),
2678            );
2679
2680        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2681        let addr = listener.local_addr().unwrap();
2682        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2683        let base = format!("http://127.0.0.1:{}", addr.port());
2684
2685        let client = reqwest::Client::new();
2686        let mut buf = Vec::new();
2687        follow_logs(&client, &base, "test", 100, None, &mut buf)
2688            .await
2689            .unwrap();
2690
2691        let output = String::from_utf8(buf).unwrap();
2692        assert!(output.contains("some output"));
2693    }
2694
2695    #[tokio::test]
2696    async fn test_follow_logs_exits_on_stale() {
2697        use axum::{Router, extract::Path, extract::Query, routing::get};
2698
2699        let app = Router::new()
2700            .route(
2701                "/api/v1/sessions/{id}/output",
2702                get(
2703                    |_path: Path<String>,
2704                     _query: Query<std::collections::HashMap<String, String>>| async {
2705                        r#"{"output":"stale output"}"#.to_owned()
2706                    },
2707                ),
2708            )
2709            .route(
2710                "/api/v1/sessions/{id}",
2711                get(|_path: Path<String>| async {
2712                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2713                }),
2714            );
2715
2716        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2717        let addr = listener.local_addr().unwrap();
2718        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2719        let base = format!("http://127.0.0.1:{}", addr.port());
2720
2721        let client = reqwest::Client::new();
2722        let mut buf = Vec::new();
2723        follow_logs(&client, &base, "test", 100, None, &mut buf)
2724            .await
2725            .unwrap();
2726
2727        let output = String::from_utf8(buf).unwrap();
2728        assert!(output.contains("stale output"));
2729    }
2730
2731    #[tokio::test]
2732    async fn test_execute_logs_follow_non_reqwest_error() {
2733        use axum::{Router, extract::Path, extract::Query, routing::get};
2734
2735        // Session status endpoint returns invalid JSON to trigger a serde error
2736        let app = Router::new()
2737            .route(
2738                "/api/v1/sessions/{id}/output",
2739                get(
2740                    |_path: Path<String>,
2741                     _query: Query<std::collections::HashMap<String, String>>| async {
2742                        r#"{"output":"initial"}"#.to_owned()
2743                    },
2744                ),
2745            )
2746            .route(
2747                "/api/v1/sessions/{id}",
2748                get(|_path: Path<String>| async { "not valid json".to_owned() }),
2749            );
2750
2751        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2752        let addr = listener.local_addr().unwrap();
2753        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2754        let node = format!("127.0.0.1:{}", addr.port());
2755
2756        let cli = Cli {
2757            node,
2758            token: None,
2759            command: Commands::Logs {
2760                name: "test".into(),
2761                lines: 100,
2762                follow: true,
2763            },
2764        };
2765        let err = execute(&cli).await.unwrap_err();
2766        // serde_json error, not a reqwest error — hits the Err(other) branch
2767        let msg = err.to_string();
2768        assert!(
2769            msg.contains("expected ident"),
2770            "Expected serde parse error, got: {msg}"
2771        );
2772    }
2773
2774    #[test]
2775    fn test_cli_parse_spawn_with_guardrails() {
2776        let cli = Cli::try_parse_from([
2777            "pulpo",
2778            "spawn",
2779            "--workdir",
2780            "/tmp",
2781            "--max-turns",
2782            "10",
2783            "--max-budget",
2784            "5.5",
2785            "--output-format",
2786            "json",
2787            "Do it",
2788        ])
2789        .unwrap();
2790        assert!(matches!(
2791            &cli.command,
2792            Commands::Spawn { max_turns, max_budget, output_format, .. }
2793                if *max_turns == Some(10) && *max_budget == Some(5.5)
2794                && output_format.as_deref() == Some("json")
2795        ));
2796    }
2797
2798    #[tokio::test]
2799    async fn test_fetch_session_status_connection_error() {
2800        let client = reqwest::Client::new();
2801        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
2802        assert!(result.is_err());
2803    }
2804
2805    // -- Crontab wrapper tests --
2806
2807    #[test]
2808    fn test_build_crontab_line() {
2809        let line = build_crontab_line(
2810            "nightly-review",
2811            "0 3 * * *",
2812            "/home/me/repo",
2813            "claude",
2814            "Review PRs",
2815            "localhost:7433",
2816        );
2817        assert_eq!(
2818            line,
2819            "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
2820        );
2821    }
2822
2823    #[test]
2824    fn test_crontab_install_success() {
2825        let crontab = "# existing cron\n0 * * * * echo hi\n";
2826        let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
2827        let result = crontab_install(crontab, "my-job", line).unwrap();
2828        assert!(result.starts_with("# existing cron\n"));
2829        assert!(result.ends_with("#pulpo:my-job\n"));
2830        assert!(result.contains("echo hi"));
2831    }
2832
2833    #[test]
2834    fn test_crontab_install_duplicate_error() {
2835        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2836        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
2837        let err = crontab_install(crontab, "my-job", line).unwrap_err();
2838        assert!(err.to_string().contains("already exists"));
2839    }
2840
2841    #[test]
2842    fn test_crontab_list_empty() {
2843        assert_eq!(crontab_list(""), "No pulpo schedules.");
2844    }
2845
2846    #[test]
2847    fn test_crontab_list_no_pulpo_entries() {
2848        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
2849    }
2850
2851    #[test]
2852    fn test_crontab_list_with_entries() {
2853        let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
2854        let output = crontab_list(crontab);
2855        assert!(output.contains("NAME"));
2856        assert!(output.contains("CRON"));
2857        assert!(output.contains("PAUSED"));
2858        assert!(output.contains("nightly"));
2859        assert!(output.contains("0 3 * * *"));
2860        assert!(output.contains("no"));
2861    }
2862
2863    #[test]
2864    fn test_crontab_list_paused_entry() {
2865        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
2866        let output = crontab_list(crontab);
2867        assert!(output.contains("paused-job"));
2868        assert!(output.contains("yes"));
2869    }
2870
2871    #[test]
2872    fn test_crontab_list_short_line() {
2873        // A line with fewer than 5 space-separated parts but still tagged
2874        let crontab = "badcron #pulpo:broken\n";
2875        let output = crontab_list(crontab);
2876        assert!(output.contains("broken"));
2877        assert!(output.contains('?'));
2878    }
2879
2880    #[test]
2881    fn test_crontab_remove_success() {
2882        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
2883        let result = crontab_remove(crontab, "my-job").unwrap();
2884        assert!(result.contains("echo hi"));
2885        assert!(!result.contains("my-job"));
2886    }
2887
2888    #[test]
2889    fn test_crontab_remove_not_found() {
2890        let crontab = "0 * * * * echo hi\n";
2891        let err = crontab_remove(crontab, "ghost").unwrap_err();
2892        assert!(err.to_string().contains("not found"));
2893    }
2894
2895    #[test]
2896    fn test_crontab_pause_success() {
2897        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2898        let result = crontab_pause(crontab, "my-job").unwrap();
2899        assert!(result.starts_with('#'));
2900        assert!(result.contains("#pulpo:my-job"));
2901    }
2902
2903    #[test]
2904    fn test_crontab_pause_not_found() {
2905        let crontab = "0 * * * * echo hi\n";
2906        let err = crontab_pause(crontab, "ghost").unwrap_err();
2907        assert!(err.to_string().contains("not found or already paused"));
2908    }
2909
2910    #[test]
2911    fn test_crontab_pause_already_paused() {
2912        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2913        let err = crontab_pause(crontab, "my-job").unwrap_err();
2914        assert!(err.to_string().contains("already paused"));
2915    }
2916
2917    #[test]
2918    fn test_crontab_resume_success() {
2919        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2920        let result = crontab_resume(crontab, "my-job").unwrap();
2921        assert!(!result.starts_with('#'));
2922        assert!(result.contains("#pulpo:my-job"));
2923    }
2924
2925    #[test]
2926    fn test_crontab_resume_not_found() {
2927        let crontab = "0 * * * * echo hi\n";
2928        let err = crontab_resume(crontab, "ghost").unwrap_err();
2929        assert!(err.to_string().contains("not found or not paused"));
2930    }
2931
2932    #[test]
2933    fn test_crontab_resume_not_paused() {
2934        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2935        let err = crontab_resume(crontab, "my-job").unwrap_err();
2936        assert!(err.to_string().contains("not paused"));
2937    }
2938
2939    // -- Schedule CLI parse tests --
2940
2941    #[test]
2942    fn test_cli_parse_schedule_install() {
2943        let cli = Cli::try_parse_from([
2944            "pulpo",
2945            "schedule",
2946            "install",
2947            "nightly",
2948            "0 3 * * *",
2949            "--workdir",
2950            "/tmp/repo",
2951            "Review",
2952            "PRs",
2953        ])
2954        .unwrap();
2955        assert!(matches!(
2956            &cli.command,
2957            Commands::Schedule {
2958                action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
2959            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
2960              && provider == "claude" && prompt == &["Review", "PRs"]
2961        ));
2962    }
2963
2964    #[test]
2965    fn test_cli_parse_schedule_list() {
2966        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
2967        assert!(matches!(
2968            &cli.command,
2969            Commands::Schedule {
2970                action: ScheduleAction::List
2971            }
2972        ));
2973    }
2974
2975    #[test]
2976    fn test_cli_parse_schedule_remove() {
2977        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
2978        assert!(matches!(
2979            &cli.command,
2980            Commands::Schedule {
2981                action: ScheduleAction::Remove { name }
2982            } if name == "nightly"
2983        ));
2984    }
2985
2986    #[test]
2987    fn test_cli_parse_schedule_pause() {
2988        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
2989        assert!(matches!(
2990            &cli.command,
2991            Commands::Schedule {
2992                action: ScheduleAction::Pause { name }
2993            } if name == "nightly"
2994        ));
2995    }
2996
2997    #[test]
2998    fn test_cli_parse_schedule_resume() {
2999        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3000        assert!(matches!(
3001            &cli.command,
3002            Commands::Schedule {
3003                action: ScheduleAction::Resume { name }
3004            } if name == "nightly"
3005        ));
3006    }
3007
3008    #[test]
3009    fn test_cli_parse_schedule_alias() {
3010        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3011        assert!(matches!(
3012            &cli.command,
3013            Commands::Schedule {
3014                action: ScheduleAction::List
3015            }
3016        ));
3017    }
3018
3019    #[test]
3020    fn test_cli_parse_schedule_list_alias() {
3021        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3022        assert!(matches!(
3023            &cli.command,
3024            Commands::Schedule {
3025                action: ScheduleAction::List
3026            }
3027        ));
3028    }
3029
3030    #[test]
3031    fn test_cli_parse_schedule_remove_alias() {
3032        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3033        assert!(matches!(
3034            &cli.command,
3035            Commands::Schedule {
3036                action: ScheduleAction::Remove { name }
3037            } if name == "nightly"
3038        ));
3039    }
3040
3041    #[test]
3042    fn test_cli_parse_schedule_install_custom_provider() {
3043        let cli = Cli::try_parse_from([
3044            "pulpo",
3045            "schedule",
3046            "install",
3047            "daily",
3048            "0 9 * * *",
3049            "--workdir",
3050            "/tmp",
3051            "--provider",
3052            "codex",
3053            "Run tests",
3054        ])
3055        .unwrap();
3056        assert!(matches!(
3057            &cli.command,
3058            Commands::Schedule {
3059                action: ScheduleAction::Install { provider, .. }
3060            } if provider == "codex"
3061        ));
3062    }
3063
3064    #[tokio::test]
3065    async fn test_execute_schedule_via_execute() {
3066        // Under coverage builds, execute_schedule is a stub returning Ok("")
3067        let node = start_test_server().await;
3068        let cli = Cli {
3069            node,
3070            token: None,
3071            command: Commands::Schedule {
3072                action: ScheduleAction::List,
3073            },
3074        };
3075        let result = execute(&cli).await;
3076        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3077        assert!(result.is_ok() || result.is_err());
3078    }
3079
3080    #[test]
3081    fn test_schedule_action_debug() {
3082        let action = ScheduleAction::List;
3083        assert_eq!(format!("{action:?}"), "List");
3084    }
3085}