Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, PeersResponse,
5};
6use pulpo_common::session::Session;
7
8#[derive(Parser, Debug)]
9#[command(
10    name = "pulpo",
11    about = "Manage agent sessions across your machines",
12    version = env!("PULPO_VERSION")
13)]
14pub struct Cli {
15    /// Target node (default: localhost)
16    #[arg(long, default_value = "localhost:7433")]
17    pub node: String,
18
19    /// Auth token (auto-discovered from local daemon if omitted)
20    #[arg(long)]
21    pub token: Option<String>,
22
23    #[command(subcommand)]
24    pub command: Commands,
25}
26
27#[derive(Subcommand, Debug)]
28#[allow(clippy::large_enum_variant)]
29pub enum Commands {
30    /// Attach to a session's terminal
31    #[command(visible_alias = "a")]
32    Attach {
33        /// Session name or ID
34        name: String,
35    },
36
37    /// Send input to a session
38    #[command(visible_alias = "i")]
39    Input {
40        /// Session name or ID
41        name: String,
42        /// Text to send (sends Enter if omitted)
43        text: Option<String>,
44    },
45
46    /// Spawn a new agent session
47    #[command(visible_alias = "s")]
48    Spawn {
49        /// Session name
50        name: String,
51
52        /// Working directory (defaults to current directory)
53        #[arg(long)]
54        workdir: Option<String>,
55
56        /// Ink name (from config)
57        #[arg(long)]
58        ink: Option<String>,
59
60        /// Human-readable description of the task
61        #[arg(long)]
62        description: Option<String>,
63
64        /// Don't attach to the session after spawning
65        #[arg(short, long)]
66        detach: bool,
67
68        /// Command to run (everything after --)
69        #[arg(last = true)]
70        command: Vec<String>,
71    },
72
73    /// List all sessions
74    #[command(visible_alias = "ls")]
75    List,
76
77    /// Show session logs/output
78    #[command(visible_alias = "l")]
79    Logs {
80        /// Session name or ID
81        name: String,
82
83        /// Number of lines to fetch
84        #[arg(long, default_value = "100")]
85        lines: usize,
86
87        /// Follow output (like `tail -f`)
88        #[arg(short, long)]
89        follow: bool,
90    },
91
92    /// Kill a session
93    #[command(visible_alias = "k")]
94    Kill {
95        /// Session name or ID
96        name: String,
97    },
98
99    /// Permanently remove a session from history
100    #[command(visible_alias = "rm")]
101    Delete {
102        /// Session name or ID
103        name: String,
104    },
105
106    /// Resume a lost session
107    #[command(visible_alias = "r")]
108    Resume {
109        /// Session name or ID
110        name: String,
111    },
112
113    /// List all known nodes
114    #[command(visible_alias = "n")]
115    Nodes,
116
117    /// Show intervention history for a session
118    #[command(visible_alias = "iv")]
119    Interventions {
120        /// Session name or ID
121        name: String,
122    },
123
124    /// Open the web dashboard in your browser
125    Ui,
126
127    /// Manage scheduled agent runs via crontab
128    #[command(visible_alias = "sched")]
129    Schedule {
130        #[command(subcommand)]
131        action: ScheduleAction,
132    },
133}
134
135#[derive(Subcommand, Debug)]
136pub enum ScheduleAction {
137    /// Install a cron schedule that spawns a session
138    Install {
139        /// Schedule name
140        name: String,
141        /// Cron expression (e.g. "0 3 * * *")
142        cron: String,
143        /// Working directory
144        #[arg(long)]
145        workdir: String,
146        /// Command to run (everything after --)
147        #[arg(last = true)]
148        command: Vec<String>,
149    },
150    /// List installed pulpo cron schedules
151    #[command(alias = "ls")]
152    List,
153    /// Remove a cron schedule
154    #[command(alias = "rm")]
155    Remove {
156        /// Schedule name
157        name: String,
158    },
159    /// Pause a cron schedule (comments out the line)
160    Pause {
161        /// Schedule name
162        name: String,
163    },
164    /// Resume a paused cron schedule (uncomments the line)
165    Resume {
166        /// Schedule name
167        name: String,
168    },
169}
170
171/// Format the base URL from the node address.
172pub fn base_url(node: &str) -> String {
173    format!("http://{node}")
174}
175
176/// Response shape for the output endpoint.
177#[derive(serde::Deserialize)]
178struct OutputResponse {
179    output: String,
180}
181
182/// Format a list of sessions as a table.
183fn format_sessions(sessions: &[Session]) -> String {
184    if sessions.is_empty() {
185        return "No sessions.".into();
186    }
187    let mut lines = vec![format!("{:<20} {:<12} {}", "NAME", "STATUS", "COMMAND")];
188    for s in sessions {
189        let cmd_display = if s.command.len() > 50 {
190            format!("{}...", &s.command[..47])
191        } else {
192            s.command.clone()
193        };
194        lines.push(format!("{:<20} {:<12} {}", s.name, s.status, cmd_display));
195    }
196    lines.join("\n")
197}
198
199/// Format the peers response as a table.
200fn format_nodes(resp: &PeersResponse) -> String {
201    let mut lines = vec![format!(
202        "{:<20} {:<25} {:<10} {}",
203        "NAME", "ADDRESS", "STATUS", "SESSIONS"
204    )];
205    lines.push(format!(
206        "{:<20} {:<25} {:<10} {}",
207        resp.local.name, "(local)", "online", "-"
208    ));
209    for p in &resp.peers {
210        let sessions = p
211            .session_count
212            .map_or_else(|| "-".into(), |c| c.to_string());
213        lines.push(format!(
214            "{:<20} {:<25} {:<10} {}",
215            p.name, p.address, p.status, sessions
216        ));
217    }
218    lines.join("\n")
219}
220
221/// Format intervention events as a table.
222fn format_interventions(events: &[InterventionEventResponse]) -> String {
223    if events.is_empty() {
224        return "No intervention events.".into();
225    }
226    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
227    for e in events {
228        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
229    }
230    lines.join("\n")
231}
232
233/// Build the command to open a URL in the default browser.
234#[cfg_attr(coverage, allow(dead_code))]
235fn build_open_command(url: &str) -> std::process::Command {
236    #[cfg(target_os = "macos")]
237    {
238        let mut cmd = std::process::Command::new("open");
239        cmd.arg(url);
240        cmd
241    }
242    #[cfg(target_os = "linux")]
243    {
244        let mut cmd = std::process::Command::new("xdg-open");
245        cmd.arg(url);
246        cmd
247    }
248    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
249    {
250        // Fallback: try xdg-open
251        let mut cmd = std::process::Command::new("xdg-open");
252        cmd.arg(url);
253        cmd
254    }
255}
256
257/// Open a URL in the default browser.
258#[cfg(not(coverage))]
259fn open_browser(url: &str) -> Result<()> {
260    build_open_command(url).status()?;
261    Ok(())
262}
263
264/// Stub for coverage builds — avoids opening a browser during tests.
265#[cfg(coverage)]
266fn open_browser(_url: &str) -> Result<()> {
267    Ok(())
268}
269
270/// Build the command to attach to a session's terminal.
271/// Takes the backend session ID (e.g. `my-session`) — the tmux session name.
272#[cfg_attr(coverage, allow(dead_code))]
273fn build_attach_command(backend_session_id: &str) -> std::process::Command {
274    let mut cmd = std::process::Command::new("tmux");
275    cmd.args(["attach-session", "-t", backend_session_id]);
276    cmd
277}
278
279/// Attach to a session's terminal.
280#[cfg(not(any(test, coverage)))]
281fn attach_session(backend_session_id: &str) -> Result<()> {
282    let status = build_attach_command(backend_session_id).status()?;
283    if !status.success() {
284        anyhow::bail!("attach failed with {status}");
285    }
286    Ok(())
287}
288
289/// Stub for test and coverage builds — avoids spawning real terminals during tests.
290#[cfg(any(test, coverage))]
291#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
292fn attach_session(_backend_session_id: &str) -> Result<()> {
293    Ok(())
294}
295
296/// Extract a clean error message from an API JSON response (or fall back to raw text).
297fn api_error(text: &str) -> anyhow::Error {
298    serde_json::from_str::<serde_json::Value>(text)
299        .ok()
300        .and_then(|v| v["error"].as_str().map(String::from))
301        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
302}
303
304/// Return the response body text, or a clean error if the response was non-success.
305async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
306    if resp.status().is_success() {
307        Ok(resp.text().await?)
308    } else {
309        let text = resp.text().await?;
310        Err(api_error(&text))
311    }
312}
313
314/// Map a reqwest error to a user-friendly message.
315fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
316    if err.is_connect() {
317        anyhow::anyhow!(
318            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
319        )
320    } else {
321        anyhow::anyhow!("Network error connecting to {node}: {err}")
322    }
323}
324
325/// Check if the node address points to localhost.
326fn is_localhost(node: &str) -> bool {
327    let host = node.split(':').next().unwrap_or(node);
328    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
329}
330
331/// Try to auto-discover the auth token from a local daemon.
332async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
333    let resp = client
334        .get(format!("{base}/api/v1/auth/token"))
335        .send()
336        .await
337        .ok()?;
338    let body: AuthTokenResponse = resp.json().await.ok()?;
339    if body.token.is_empty() {
340        None
341    } else {
342        Some(body.token)
343    }
344}
345
346/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
347async fn resolve_token(
348    client: &reqwest::Client,
349    base: &str,
350    node: &str,
351    explicit: Option<&str>,
352) -> Option<String> {
353    if let Some(t) = explicit {
354        return Some(t.to_owned());
355    }
356    if is_localhost(node) {
357        return discover_token(client, base).await;
358    }
359    None
360}
361
362/// Build an authenticated GET request.
363fn authed_get(
364    client: &reqwest::Client,
365    url: String,
366    token: Option<&str>,
367) -> reqwest::RequestBuilder {
368    let req = client.get(url);
369    if let Some(t) = token {
370        req.bearer_auth(t)
371    } else {
372        req
373    }
374}
375
376/// Build an authenticated POST request.
377fn authed_post(
378    client: &reqwest::Client,
379    url: String,
380    token: Option<&str>,
381) -> reqwest::RequestBuilder {
382    let req = client.post(url);
383    if let Some(t) = token {
384        req.bearer_auth(t)
385    } else {
386        req
387    }
388}
389
390/// Build an authenticated DELETE request.
391fn authed_delete(
392    client: &reqwest::Client,
393    url: String,
394    token: Option<&str>,
395) -> reqwest::RequestBuilder {
396    let req = client.delete(url);
397    if let Some(t) = token {
398        req.bearer_auth(t)
399    } else {
400        req
401    }
402}
403
404/// Fetch session output from the API.
405async fn fetch_output(
406    client: &reqwest::Client,
407    base: &str,
408    name: &str,
409    lines: usize,
410    token: Option<&str>,
411) -> Result<String> {
412    let resp = authed_get(
413        client,
414        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
415        token,
416    )
417    .send()
418    .await?;
419    let text = ok_or_api_error(resp).await?;
420    let output: OutputResponse = serde_json::from_str(&text)?;
421    Ok(output.output)
422}
423
424/// Fetch session status from the API.
425async fn fetch_session_status(
426    client: &reqwest::Client,
427    base: &str,
428    name: &str,
429    token: Option<&str>,
430) -> Result<String> {
431    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
432        .send()
433        .await?;
434    let text = ok_or_api_error(resp).await?;
435    let session: Session = serde_json::from_str(&text)?;
436    Ok(session.status.to_string())
437}
438
439/// Compute the new trailing lines that differ from the previous output.
440///
441/// The output endpoint returns the last N lines from the terminal pane. As new lines
442/// appear, old lines at the top scroll off. We find the overlap between the end
443/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
444/// trailing lines.
445fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
446    if prev.is_empty() {
447        return new;
448    }
449
450    let prev_lines: Vec<&str> = prev.lines().collect();
451    let new_lines: Vec<&str> = new.lines().collect();
452
453    if new_lines.is_empty() {
454        return "";
455    }
456
457    // prev is non-empty (early return above), so last() always succeeds
458    let last_prev = prev_lines[prev_lines.len() - 1];
459
460    // Find the last line of prev in new to determine the overlap boundary
461    for i in (0..new_lines.len()).rev() {
462        if new_lines[i] == last_prev {
463            // Verify contiguous overlap: check that lines before this match too
464            let overlap_len = prev_lines.len().min(i + 1);
465            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
466            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
467            if prev_tail == new_overlap {
468                if i + 1 < new_lines.len() {
469                    // Return the slice of `new` after the overlap
470                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
471                    return new.get(consumed.min(new.len())..).unwrap_or("");
472                }
473                return "";
474            }
475        }
476    }
477
478    // No overlap found — output changed completely, print it all
479    new
480}
481
482/// Follow logs by polling, printing only new output. Returns when the session ends.
483async fn follow_logs(
484    client: &reqwest::Client,
485    base: &str,
486    name: &str,
487    lines: usize,
488    token: Option<&str>,
489    writer: &mut (dyn std::io::Write + Send),
490) -> Result<()> {
491    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
492    write!(writer, "{prev_output}")?;
493
494    loop {
495        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
496
497        // Check session status
498        let status = fetch_session_status(client, base, name, token).await?;
499        let is_terminal = status == "ready" || status == "killed" || status == "lost";
500
501        // Fetch latest output
502        let new_output = fetch_output(client, base, name, lines, token).await?;
503
504        let diff = diff_output(&prev_output, &new_output);
505        if !diff.is_empty() {
506            write!(writer, "{diff}")?;
507        }
508        prev_output = new_output;
509
510        if is_terminal {
511            break;
512        }
513    }
514    Ok(())
515}
516
517// --- Crontab wrapper ---
518
519#[cfg_attr(coverage, allow(dead_code))]
520const CRONTAB_TAG: &str = "#pulpo:";
521
522/// Read the current crontab. Returns empty string if no crontab exists.
523#[cfg(not(coverage))]
524fn read_crontab() -> Result<String> {
525    let output = std::process::Command::new("crontab").arg("-l").output()?;
526    if output.status.success() {
527        Ok(String::from_utf8_lossy(&output.stdout).to_string())
528    } else {
529        Ok(String::new())
530    }
531}
532
533/// Write the given content as the user's crontab.
534#[cfg(not(coverage))]
535fn write_crontab(content: &str) -> Result<()> {
536    use std::io::Write;
537    let mut child = std::process::Command::new("crontab")
538        .arg("-")
539        .stdin(std::process::Stdio::piped())
540        .spawn()?;
541    child
542        .stdin
543        .as_mut()
544        .unwrap()
545        .write_all(content.as_bytes())?;
546    let status = child.wait()?;
547    if !status.success() {
548        anyhow::bail!("crontab write failed");
549    }
550    Ok(())
551}
552
553/// Build the crontab line for a pulpo schedule.
554#[cfg_attr(coverage, allow(dead_code))]
555fn build_crontab_line(name: &str, cron: &str, workdir: &str, command: &str, node: &str) -> String {
556    format!(
557        "{cron} pulpo --node {node} spawn {name} --workdir {workdir} -- {command} {CRONTAB_TAG}{name}\n"
558    )
559}
560
561/// Install a cron schedule into a crontab string. Returns the updated crontab.
562#[cfg_attr(coverage, allow(dead_code))]
563fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
564    let tag = format!("{CRONTAB_TAG}{name}");
565    if crontab.contains(&tag) {
566        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
567    }
568    let mut result = crontab.to_owned();
569    result.push_str(line);
570    Ok(result)
571}
572
573/// Format pulpo crontab entries for display.
574#[cfg_attr(coverage, allow(dead_code))]
575fn crontab_list(crontab: &str) -> String {
576    let entries: Vec<&str> = crontab
577        .lines()
578        .filter(|l| l.contains(CRONTAB_TAG))
579        .collect();
580    if entries.is_empty() {
581        return "No pulpo schedules.".into();
582    }
583    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
584    for entry in entries {
585        let paused = entry.starts_with('#');
586        let raw = entry.trim_start_matches('#').trim();
587        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
588        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
589        let cron_expr = if parts.len() >= 5 {
590            parts[..5].join(" ")
591        } else {
592            "?".into()
593        };
594        lines.push(format!(
595            "{:<20} {:<15} {}",
596            name,
597            cron_expr,
598            if paused { "yes" } else { "no" }
599        ));
600    }
601    lines.join("\n")
602}
603
604/// Remove a schedule from a crontab string. Returns the updated crontab.
605#[cfg_attr(coverage, allow(dead_code))]
606fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
607    use std::fmt::Write;
608    let tag = format!("{CRONTAB_TAG}{name}");
609    let filtered =
610        crontab
611            .lines()
612            .filter(|l| !l.contains(&tag))
613            .fold(String::new(), |mut acc, l| {
614                writeln!(acc, "{l}").unwrap();
615                acc
616            });
617    if filtered.len() == crontab.len() {
618        anyhow::bail!("schedule \"{name}\" not found");
619    }
620    Ok(filtered)
621}
622
623/// Pause (comment out) a schedule in a crontab string.
624#[cfg_attr(coverage, allow(dead_code))]
625fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
626    use std::fmt::Write;
627    let tag = format!("{CRONTAB_TAG}{name}");
628    let mut found = false;
629    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
630        if l.contains(&tag) && !l.starts_with('#') {
631            found = true;
632            writeln!(acc, "#{l}").unwrap();
633        } else {
634            writeln!(acc, "{l}").unwrap();
635        }
636        acc
637    });
638    if !found {
639        anyhow::bail!("schedule \"{name}\" not found or already paused");
640    }
641    Ok(updated)
642}
643
644/// Resume (uncomment) a schedule in a crontab string.
645#[cfg_attr(coverage, allow(dead_code))]
646fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
647    use std::fmt::Write;
648    let tag = format!("{CRONTAB_TAG}{name}");
649    let mut found = false;
650    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
651        if l.contains(&tag) && l.starts_with('#') {
652            found = true;
653            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
654        } else {
655            writeln!(acc, "{l}").unwrap();
656        }
657        acc
658    });
659    if !found {
660        anyhow::bail!("schedule \"{name}\" not found or not paused");
661    }
662    Ok(updated)
663}
664
665/// Execute a schedule subcommand using the crontab wrapper.
666#[cfg(not(coverage))]
667fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
668    match action {
669        ScheduleAction::Install {
670            name,
671            cron,
672            workdir,
673            command,
674        } => {
675            let crontab = read_crontab()?;
676            let joined_command = command.join(" ");
677            let line = build_crontab_line(name, cron, workdir, &joined_command, node);
678            let updated = crontab_install(&crontab, name, &line)?;
679            write_crontab(&updated)?;
680            Ok(format!("Installed schedule \"{name}\""))
681        }
682        ScheduleAction::List => {
683            let crontab = read_crontab()?;
684            Ok(crontab_list(&crontab))
685        }
686        ScheduleAction::Remove { name } => {
687            let crontab = read_crontab()?;
688            let updated = crontab_remove(&crontab, name)?;
689            write_crontab(&updated)?;
690            Ok(format!("Removed schedule \"{name}\""))
691        }
692        ScheduleAction::Pause { name } => {
693            let crontab = read_crontab()?;
694            let updated = crontab_pause(&crontab, name)?;
695            write_crontab(&updated)?;
696            Ok(format!("Paused schedule \"{name}\""))
697        }
698        ScheduleAction::Resume { name } => {
699            let crontab = read_crontab()?;
700            let updated = crontab_resume(&crontab, name)?;
701            write_crontab(&updated)?;
702            Ok(format!("Resumed schedule \"{name}\""))
703        }
704    }
705}
706
707/// Stub for coverage builds — crontab is real I/O.
708#[cfg(coverage)]
709fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
710    Ok(String::new())
711}
712
713/// Execute the given CLI command against the specified node.
714#[allow(clippy::too_many_lines)]
715pub async fn execute(cli: &Cli) -> Result<String> {
716    let url = base_url(&cli.node);
717    let client = reqwest::Client::new();
718    let node = &cli.node;
719    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
720
721    match &cli.command {
722        Commands::Attach { name } => {
723            // Fetch session to get status and backend_session_id
724            let resp = authed_get(
725                &client,
726                format!("{url}/api/v1/sessions/{name}"),
727                token.as_deref(),
728            )
729            .send()
730            .await
731            .map_err(|e| friendly_error(&e, node))?;
732            let text = ok_or_api_error(resp).await?;
733            let session: Session = serde_json::from_str(&text)?;
734            match session.status.to_string().as_str() {
735                "lost" => {
736                    anyhow::bail!(
737                        "Session \"{name}\" is lost (agent process died). Resume it first:\n  pulpo resume {name}"
738                    );
739                }
740                "killed" => {
741                    anyhow::bail!(
742                        "Session \"{name}\" is {} — cannot attach to a killed session.",
743                        session.status
744                    );
745                }
746                _ => {}
747            }
748            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
749            attach_session(&backend_id)?;
750            Ok(format!("Detached from session {name}."))
751        }
752        Commands::Input { name, text } => {
753            let input_text = text.as_deref().unwrap_or("\n");
754            let body = serde_json::json!({ "text": input_text });
755            let resp = authed_post(
756                &client,
757                format!("{url}/api/v1/sessions/{name}/input"),
758                token.as_deref(),
759            )
760            .json(&body)
761            .send()
762            .await
763            .map_err(|e| friendly_error(&e, node))?;
764            ok_or_api_error(resp).await?;
765            Ok(format!("Sent input to session {name}."))
766        }
767        Commands::List => {
768            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
769                .send()
770                .await
771                .map_err(|e| friendly_error(&e, node))?;
772            let text = ok_or_api_error(resp).await?;
773            let sessions: Vec<Session> = serde_json::from_str(&text)?;
774            Ok(format_sessions(&sessions))
775        }
776        Commands::Nodes => {
777            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
778                .send()
779                .await
780                .map_err(|e| friendly_error(&e, node))?;
781            let text = ok_or_api_error(resp).await?;
782            let resp: PeersResponse = serde_json::from_str(&text)?;
783            Ok(format_nodes(&resp))
784        }
785        Commands::Spawn {
786            workdir,
787            name,
788            ink,
789            description,
790            detach,
791            command,
792        } => {
793            let cmd = if command.is_empty() {
794                None
795            } else {
796                Some(command.join(" "))
797            };
798            // Resolve workdir: --workdir flag > current directory
799            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
800                std::env::current_dir()
801                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
802            });
803            let mut body = serde_json::json!({
804                "name": name,
805                "workdir": resolved_workdir,
806            });
807            if let Some(c) = &cmd {
808                body["command"] = serde_json::json!(c);
809            }
810            if let Some(i) = ink {
811                body["ink"] = serde_json::json!(i);
812            }
813            if let Some(d) = description {
814                body["description"] = serde_json::json!(d);
815            }
816            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
817                .json(&body)
818                .send()
819                .await
820                .map_err(|e| friendly_error(&e, node))?;
821            let text = ok_or_api_error(resp).await?;
822            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
823            let msg = format!(
824                "Created session \"{}\" ({})",
825                resp.session.name, resp.session.id
826            );
827            if !detach {
828                let backend_id = resp
829                    .session
830                    .backend_session_id
831                    .as_deref()
832                    .unwrap_or(&resp.session.name);
833                eprintln!("{msg}");
834                attach_session(backend_id)?;
835                return Ok(format!("Detached from session \"{}\".", resp.session.name));
836            }
837            Ok(msg)
838        }
839        Commands::Kill { name } => {
840            let resp = authed_post(
841                &client,
842                format!("{url}/api/v1/sessions/{name}/kill"),
843                token.as_deref(),
844            )
845            .send()
846            .await
847            .map_err(|e| friendly_error(&e, node))?;
848            ok_or_api_error(resp).await?;
849            Ok(format!("Session {name} killed."))
850        }
851        Commands::Delete { name } => {
852            let resp = authed_delete(
853                &client,
854                format!("{url}/api/v1/sessions/{name}"),
855                token.as_deref(),
856            )
857            .send()
858            .await
859            .map_err(|e| friendly_error(&e, node))?;
860            ok_or_api_error(resp).await?;
861            Ok(format!("Session {name} deleted."))
862        }
863        Commands::Logs {
864            name,
865            lines,
866            follow,
867        } => {
868            if *follow {
869                let mut stdout = std::io::stdout();
870                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
871                    .await
872                    .map_err(|e| {
873                        // Unwrap reqwest errors to friendly messages
874                        match e.downcast::<reqwest::Error>() {
875                            Ok(re) => friendly_error(&re, node),
876                            Err(other) => other,
877                        }
878                    })?;
879                Ok(String::new())
880            } else {
881                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
882                    .await
883                    .map_err(|e| match e.downcast::<reqwest::Error>() {
884                        Ok(re) => friendly_error(&re, node),
885                        Err(other) => other,
886                    })?;
887                Ok(output)
888            }
889        }
890        Commands::Interventions { name } => {
891            let resp = authed_get(
892                &client,
893                format!("{url}/api/v1/sessions/{name}/interventions"),
894                token.as_deref(),
895            )
896            .send()
897            .await
898            .map_err(|e| friendly_error(&e, node))?;
899            let text = ok_or_api_error(resp).await?;
900            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
901            Ok(format_interventions(&events))
902        }
903        Commands::Ui => {
904            let dashboard = base_url(&cli.node);
905            open_browser(&dashboard)?;
906            Ok(format!("Opening {dashboard}"))
907        }
908        Commands::Resume { name } => {
909            let resp = authed_post(
910                &client,
911                format!("{url}/api/v1/sessions/{name}/resume"),
912                token.as_deref(),
913            )
914            .send()
915            .await
916            .map_err(|e| friendly_error(&e, node))?;
917            let text = ok_or_api_error(resp).await?;
918            let session: Session = serde_json::from_str(&text)?;
919            let backend_id = session
920                .backend_session_id
921                .as_deref()
922                .unwrap_or(&session.name);
923            eprintln!("Resumed session \"{}\"", session.name);
924            attach_session(backend_id)?;
925            Ok(format!("Detached from session \"{}\".", session.name))
926        }
927        Commands::Schedule { action } => execute_schedule(action, node),
928    }
929}
930
931#[cfg(test)]
932mod tests {
933    use super::*;
934
935    #[test]
936    fn test_base_url() {
937        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
938        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
939    }
940
941    #[test]
942    fn test_cli_parse_list() {
943        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
944        assert_eq!(cli.node, "localhost:7433");
945        assert!(matches!(cli.command, Commands::List));
946    }
947
948    #[test]
949    fn test_cli_parse_nodes() {
950        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
951        assert!(matches!(cli.command, Commands::Nodes));
952    }
953
954    #[test]
955    fn test_cli_parse_ui() {
956        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
957        assert!(matches!(cli.command, Commands::Ui));
958    }
959
960    #[test]
961    fn test_cli_parse_ui_custom_node() {
962        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
963        assert!(matches!(cli.command, Commands::Ui));
964        assert_eq!(cli.node, "mac-mini:7433");
965    }
966
967    #[test]
968    fn test_build_open_command() {
969        let cmd = build_open_command("http://localhost:7433");
970        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
971        assert_eq!(args, vec!["http://localhost:7433"]);
972        #[cfg(target_os = "macos")]
973        assert_eq!(cmd.get_program(), "open");
974        #[cfg(target_os = "linux")]
975        assert_eq!(cmd.get_program(), "xdg-open");
976    }
977
978    #[test]
979    fn test_cli_parse_spawn() {
980        let cli = Cli::try_parse_from([
981            "pulpo",
982            "spawn",
983            "my-task",
984            "--workdir",
985            "/tmp/repo",
986            "--",
987            "claude",
988            "-p",
989            "Fix the bug",
990        ])
991        .unwrap();
992        assert!(matches!(
993            &cli.command,
994            Commands::Spawn { name, workdir, command, .. }
995                if name == "my-task" && workdir.as_deref() == Some("/tmp/repo")
996                && command == &["claude", "-p", "Fix the bug"]
997        ));
998    }
999
1000    #[test]
1001    fn test_cli_parse_spawn_with_ink() {
1002        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1003        assert!(matches!(
1004            &cli.command,
1005            Commands::Spawn { ink, .. } if ink.as_deref() == Some("coder")
1006        ));
1007    }
1008
1009    #[test]
1010    fn test_cli_parse_spawn_with_description() {
1011        let cli =
1012            Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1013                .unwrap();
1014        assert!(matches!(
1015            &cli.command,
1016            Commands::Spawn { description, .. } if description.as_deref() == Some("Fix the bug")
1017        ));
1018    }
1019
1020    #[test]
1021    fn test_cli_parse_spawn_name_positional() {
1022        let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1023        assert!(matches!(
1024            &cli.command,
1025            Commands::Spawn { name, command, .. }
1026                if name == "portal" && command == &["echo", "hello"]
1027        ));
1028    }
1029
1030    #[test]
1031    fn test_cli_parse_spawn_no_command() {
1032        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1033        assert!(matches!(
1034            &cli.command,
1035            Commands::Spawn { command, .. } if command.is_empty()
1036        ));
1037    }
1038
1039    #[test]
1040    fn test_cli_parse_spawn_detach() {
1041        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1042        assert!(matches!(
1043            &cli.command,
1044            Commands::Spawn { detach, .. } if *detach
1045        ));
1046    }
1047
1048    #[test]
1049    fn test_cli_parse_spawn_detach_short() {
1050        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1051        assert!(matches!(
1052            &cli.command,
1053            Commands::Spawn { detach, .. } if *detach
1054        ));
1055    }
1056
1057    #[test]
1058    fn test_cli_parse_spawn_detach_default() {
1059        let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1060        assert!(matches!(
1061            &cli.command,
1062            Commands::Spawn { detach, .. } if !detach
1063        ));
1064    }
1065
1066    #[test]
1067    fn test_cli_parse_logs() {
1068        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1069        assert!(matches!(
1070            &cli.command,
1071            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1072        ));
1073    }
1074
1075    #[test]
1076    fn test_cli_parse_logs_with_lines() {
1077        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1078        assert!(matches!(
1079            &cli.command,
1080            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1081        ));
1082    }
1083
1084    #[test]
1085    fn test_cli_parse_logs_follow() {
1086        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1087        assert!(matches!(
1088            &cli.command,
1089            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1090        ));
1091    }
1092
1093    #[test]
1094    fn test_cli_parse_logs_follow_short() {
1095        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1096        assert!(matches!(
1097            &cli.command,
1098            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1099        ));
1100    }
1101
1102    #[test]
1103    fn test_cli_parse_kill() {
1104        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1105        assert!(matches!(
1106            &cli.command,
1107            Commands::Kill { name } if name == "my-session"
1108        ));
1109    }
1110
1111    #[test]
1112    fn test_cli_parse_delete() {
1113        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1114        assert!(matches!(
1115            &cli.command,
1116            Commands::Delete { name } if name == "my-session"
1117        ));
1118    }
1119
1120    #[test]
1121    fn test_cli_parse_resume() {
1122        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1123        assert!(matches!(
1124            &cli.command,
1125            Commands::Resume { name } if name == "my-session"
1126        ));
1127    }
1128
1129    #[test]
1130    fn test_cli_parse_input() {
1131        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1132        assert!(matches!(
1133            &cli.command,
1134            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1135        ));
1136    }
1137
1138    #[test]
1139    fn test_cli_parse_input_no_text() {
1140        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1141        assert!(matches!(
1142            &cli.command,
1143            Commands::Input { name, text } if name == "my-session" && text.is_none()
1144        ));
1145    }
1146
1147    #[test]
1148    fn test_cli_parse_input_alias() {
1149        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1150        assert!(matches!(
1151            &cli.command,
1152            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1153        ));
1154    }
1155
1156    #[test]
1157    fn test_cli_parse_custom_node() {
1158        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1159        assert_eq!(cli.node, "win-pc:8080");
1160    }
1161
1162    #[test]
1163    fn test_cli_version() {
1164        let result = Cli::try_parse_from(["pulpo", "--version"]);
1165        // clap exits with an error (kind DisplayVersion) when --version is used
1166        let err = result.unwrap_err();
1167        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1168    }
1169
1170    #[test]
1171    fn test_cli_parse_no_subcommand_fails() {
1172        let result = Cli::try_parse_from(["pulpo"]);
1173        assert!(result.is_err());
1174    }
1175
1176    #[test]
1177    fn test_cli_debug() {
1178        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1179        let debug = format!("{cli:?}");
1180        assert!(debug.contains("List"));
1181    }
1182
1183    #[test]
1184    fn test_commands_debug() {
1185        let cmd = Commands::List;
1186        assert_eq!(format!("{cmd:?}"), "List");
1187    }
1188
1189    /// A valid Session JSON for test responses.
1190    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1191
1192    /// A valid `CreateSessionResponse` JSON wrapping the session.
1193    fn test_create_response_json() -> String {
1194        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1195    }
1196
1197    /// Start a lightweight test HTTP server and return its address.
1198    async fn start_test_server() -> String {
1199        use axum::http::StatusCode;
1200        use axum::{
1201            Json, Router,
1202            routing::{get, post},
1203        };
1204
1205        let create_json = test_create_response_json();
1206
1207        let app = Router::new()
1208            .route(
1209                "/api/v1/sessions",
1210                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1211                    (StatusCode::CREATED, create_json.clone())
1212                }),
1213            )
1214            .route(
1215                "/api/v1/sessions/{id}",
1216                get(|| async { TEST_SESSION_JSON.to_owned() })
1217                    .delete(|| async { StatusCode::NO_CONTENT }),
1218            )
1219            .route(
1220                "/api/v1/sessions/{id}/kill",
1221                post(|| async { StatusCode::NO_CONTENT }),
1222            )
1223            .route(
1224                "/api/v1/sessions/{id}/output",
1225                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1226            )
1227            .route(
1228                "/api/v1/peers",
1229                get(|| async {
1230                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1231                }),
1232            )
1233            .route(
1234                "/api/v1/sessions/{id}/resume",
1235                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1236            )
1237            .route(
1238                "/api/v1/sessions/{id}/interventions",
1239                get(|| async { "[]".to_owned() }),
1240            )
1241            .route(
1242                "/api/v1/sessions/{id}/input",
1243                post(|| async { StatusCode::NO_CONTENT }),
1244            );
1245
1246        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1247        let addr = listener.local_addr().unwrap();
1248        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1249        format!("127.0.0.1:{}", addr.port())
1250    }
1251
1252    #[tokio::test]
1253    async fn test_execute_list_success() {
1254        let node = start_test_server().await;
1255        let cli = Cli {
1256            node,
1257            token: None,
1258            command: Commands::List,
1259        };
1260        let result = execute(&cli).await.unwrap();
1261        assert_eq!(result, "No sessions.");
1262    }
1263
1264    #[tokio::test]
1265    async fn test_execute_nodes_success() {
1266        let node = start_test_server().await;
1267        let cli = Cli {
1268            node,
1269            token: None,
1270            command: Commands::Nodes,
1271        };
1272        let result = execute(&cli).await.unwrap();
1273        assert!(result.contains("test"));
1274        assert!(result.contains("(local)"));
1275        assert!(result.contains("NAME"));
1276    }
1277
1278    #[tokio::test]
1279    async fn test_execute_spawn_success() {
1280        let node = start_test_server().await;
1281        let cli = Cli {
1282            node,
1283            token: None,
1284            command: Commands::Spawn {
1285                name: "test".into(),
1286                workdir: Some("/tmp/repo".into()),
1287                ink: None,
1288                description: None,
1289                detach: true,
1290                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1291            },
1292        };
1293        let result = execute(&cli).await.unwrap();
1294        assert!(result.contains("Created session"));
1295        assert!(result.contains("repo"));
1296    }
1297
1298    #[tokio::test]
1299    async fn test_execute_spawn_with_all_flags() {
1300        let node = start_test_server().await;
1301        let cli = Cli {
1302            node,
1303            token: None,
1304            command: Commands::Spawn {
1305                name: "test".into(),
1306                workdir: Some("/tmp/repo".into()),
1307                ink: Some("coder".into()),
1308                description: Some("Fix the bug".into()),
1309                detach: true,
1310                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1311            },
1312        };
1313        let result = execute(&cli).await.unwrap();
1314        assert!(result.contains("Created session"));
1315    }
1316
1317    #[tokio::test]
1318    async fn test_execute_spawn_no_command() {
1319        let node = start_test_server().await;
1320        let cli = Cli {
1321            node,
1322            token: None,
1323            command: Commands::Spawn {
1324                name: "test".into(),
1325                workdir: Some("/tmp/repo".into()),
1326                ink: None,
1327                description: None,
1328                detach: true,
1329                command: vec![],
1330            },
1331        };
1332        let result = execute(&cli).await.unwrap();
1333        assert!(result.contains("Created session"));
1334    }
1335
1336    #[tokio::test]
1337    async fn test_execute_spawn_with_name() {
1338        let node = start_test_server().await;
1339        let cli = Cli {
1340            node,
1341            token: None,
1342            command: Commands::Spawn {
1343                name: "my-task".into(),
1344                workdir: Some("/tmp/repo".into()),
1345                ink: None,
1346                description: None,
1347                detach: true,
1348                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1349            },
1350        };
1351        let result = execute(&cli).await.unwrap();
1352        assert!(result.contains("Created session"));
1353    }
1354
1355    #[tokio::test]
1356    async fn test_execute_spawn_auto_attach() {
1357        let node = start_test_server().await;
1358        let cli = Cli {
1359            node,
1360            token: None,
1361            command: Commands::Spawn {
1362                name: "test".into(),
1363                workdir: Some("/tmp/repo".into()),
1364                ink: None,
1365                description: None,
1366                detach: false,
1367                command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1368            },
1369        };
1370        let result = execute(&cli).await.unwrap();
1371        // When not detached, spawn prints creation to stderr and returns detach message
1372        assert!(result.contains("Detached from session"));
1373    }
1374
1375    #[tokio::test]
1376    async fn test_execute_kill_success() {
1377        let node = start_test_server().await;
1378        let cli = Cli {
1379            node,
1380            token: None,
1381            command: Commands::Kill {
1382                name: "test-session".into(),
1383            },
1384        };
1385        let result = execute(&cli).await.unwrap();
1386        assert!(result.contains("killed"));
1387    }
1388
1389    #[tokio::test]
1390    async fn test_execute_delete_success() {
1391        let node = start_test_server().await;
1392        let cli = Cli {
1393            node,
1394            token: None,
1395            command: Commands::Delete {
1396                name: "test-session".into(),
1397            },
1398        };
1399        let result = execute(&cli).await.unwrap();
1400        assert!(result.contains("deleted"));
1401    }
1402
1403    #[tokio::test]
1404    async fn test_execute_logs_success() {
1405        let node = start_test_server().await;
1406        let cli = Cli {
1407            node,
1408            token: None,
1409            command: Commands::Logs {
1410                name: "test-session".into(),
1411                lines: 50,
1412                follow: false,
1413            },
1414        };
1415        let result = execute(&cli).await.unwrap();
1416        assert!(result.contains("test output"));
1417    }
1418
1419    #[tokio::test]
1420    async fn test_execute_list_connection_refused() {
1421        let cli = Cli {
1422            node: "localhost:1".into(),
1423            token: None,
1424            command: Commands::List,
1425        };
1426        let result = execute(&cli).await;
1427        let err = result.unwrap_err().to_string();
1428        assert!(
1429            err.contains("Could not connect to pulpod"),
1430            "Expected friendly error, got: {err}"
1431        );
1432        assert!(err.contains("localhost:1"));
1433    }
1434
1435    #[tokio::test]
1436    async fn test_execute_nodes_connection_refused() {
1437        let cli = Cli {
1438            node: "localhost:1".into(),
1439            token: None,
1440            command: Commands::Nodes,
1441        };
1442        let result = execute(&cli).await;
1443        let err = result.unwrap_err().to_string();
1444        assert!(err.contains("Could not connect to pulpod"));
1445    }
1446
1447    #[tokio::test]
1448    async fn test_execute_kill_error_response() {
1449        use axum::{Router, http::StatusCode, routing::post};
1450
1451        let app = Router::new().route(
1452            "/api/v1/sessions/{id}/kill",
1453            post(|| async {
1454                (
1455                    StatusCode::NOT_FOUND,
1456                    "{\"error\":\"session not found: test-session\"}",
1457                )
1458            }),
1459        );
1460        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1461        let addr = listener.local_addr().unwrap();
1462        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1463        let node = format!("127.0.0.1:{}", addr.port());
1464
1465        let cli = Cli {
1466            node,
1467            token: None,
1468            command: Commands::Kill {
1469                name: "test-session".into(),
1470            },
1471        };
1472        let err = execute(&cli).await.unwrap_err();
1473        assert_eq!(err.to_string(), "session not found: test-session");
1474    }
1475
1476    #[tokio::test]
1477    async fn test_execute_delete_error_response() {
1478        use axum::{Router, http::StatusCode, routing::delete};
1479
1480        let app = Router::new().route(
1481            "/api/v1/sessions/{id}",
1482            delete(|| async {
1483                (
1484                    StatusCode::CONFLICT,
1485                    "{\"error\":\"cannot delete session in 'running' state\"}",
1486                )
1487            }),
1488        );
1489        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1490        let addr = listener.local_addr().unwrap();
1491        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1492        let node = format!("127.0.0.1:{}", addr.port());
1493
1494        let cli = Cli {
1495            node,
1496            token: None,
1497            command: Commands::Delete {
1498                name: "test-session".into(),
1499            },
1500        };
1501        let err = execute(&cli).await.unwrap_err();
1502        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1503    }
1504
1505    #[tokio::test]
1506    async fn test_execute_logs_error_response() {
1507        use axum::{Router, http::StatusCode, routing::get};
1508
1509        let app = Router::new().route(
1510            "/api/v1/sessions/{id}/output",
1511            get(|| async {
1512                (
1513                    StatusCode::NOT_FOUND,
1514                    "{\"error\":\"session not found: ghost\"}",
1515                )
1516            }),
1517        );
1518        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1519        let addr = listener.local_addr().unwrap();
1520        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1521        let node = format!("127.0.0.1:{}", addr.port());
1522
1523        let cli = Cli {
1524            node,
1525            token: None,
1526            command: Commands::Logs {
1527                name: "ghost".into(),
1528                lines: 50,
1529                follow: false,
1530            },
1531        };
1532        let err = execute(&cli).await.unwrap_err();
1533        assert_eq!(err.to_string(), "session not found: ghost");
1534    }
1535
1536    #[tokio::test]
1537    async fn test_execute_resume_error_response() {
1538        use axum::{Router, http::StatusCode, routing::post};
1539
1540        let app = Router::new().route(
1541            "/api/v1/sessions/{id}/resume",
1542            post(|| async {
1543                (
1544                    StatusCode::BAD_REQUEST,
1545                    "{\"error\":\"session is not lost (status: active)\"}",
1546                )
1547            }),
1548        );
1549        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1550        let addr = listener.local_addr().unwrap();
1551        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1552        let node = format!("127.0.0.1:{}", addr.port());
1553
1554        let cli = Cli {
1555            node,
1556            token: None,
1557            command: Commands::Resume {
1558                name: "test-session".into(),
1559            },
1560        };
1561        let err = execute(&cli).await.unwrap_err();
1562        assert_eq!(err.to_string(), "session is not lost (status: active)");
1563    }
1564
1565    #[tokio::test]
1566    async fn test_execute_spawn_error_response() {
1567        use axum::{Router, http::StatusCode, routing::post};
1568
1569        let app = Router::new().route(
1570            "/api/v1/sessions",
1571            post(|| async {
1572                (
1573                    StatusCode::INTERNAL_SERVER_ERROR,
1574                    "{\"error\":\"failed to spawn session\"}",
1575                )
1576            }),
1577        );
1578        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1579        let addr = listener.local_addr().unwrap();
1580        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1581        let node = format!("127.0.0.1:{}", addr.port());
1582
1583        let cli = Cli {
1584            node,
1585            token: None,
1586            command: Commands::Spawn {
1587                name: "test".into(),
1588                workdir: Some("/tmp/repo".into()),
1589                ink: None,
1590                description: None,
1591                detach: true,
1592                command: vec!["test".into()],
1593            },
1594        };
1595        let err = execute(&cli).await.unwrap_err();
1596        assert_eq!(err.to_string(), "failed to spawn session");
1597    }
1598
1599    #[tokio::test]
1600    async fn test_execute_interventions_error_response() {
1601        use axum::{Router, http::StatusCode, routing::get};
1602
1603        let app = Router::new().route(
1604            "/api/v1/sessions/{id}/interventions",
1605            get(|| async {
1606                (
1607                    StatusCode::NOT_FOUND,
1608                    "{\"error\":\"session not found: ghost\"}",
1609                )
1610            }),
1611        );
1612        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1613        let addr = listener.local_addr().unwrap();
1614        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1615        let node = format!("127.0.0.1:{}", addr.port());
1616
1617        let cli = Cli {
1618            node,
1619            token: None,
1620            command: Commands::Interventions {
1621                name: "ghost".into(),
1622            },
1623        };
1624        let err = execute(&cli).await.unwrap_err();
1625        assert_eq!(err.to_string(), "session not found: ghost");
1626    }
1627
1628    #[tokio::test]
1629    async fn test_execute_resume_success() {
1630        let node = start_test_server().await;
1631        let cli = Cli {
1632            node,
1633            token: None,
1634            command: Commands::Resume {
1635                name: "test-session".into(),
1636            },
1637        };
1638        let result = execute(&cli).await.unwrap();
1639        assert!(result.contains("Detached from session"));
1640    }
1641
1642    #[tokio::test]
1643    async fn test_execute_input_success() {
1644        let node = start_test_server().await;
1645        let cli = Cli {
1646            node,
1647            token: None,
1648            command: Commands::Input {
1649                name: "test-session".into(),
1650                text: Some("yes".into()),
1651            },
1652        };
1653        let result = execute(&cli).await.unwrap();
1654        assert!(result.contains("Sent input to session test-session"));
1655    }
1656
1657    #[tokio::test]
1658    async fn test_execute_input_no_text() {
1659        let node = start_test_server().await;
1660        let cli = Cli {
1661            node,
1662            token: None,
1663            command: Commands::Input {
1664                name: "test-session".into(),
1665                text: None,
1666            },
1667        };
1668        let result = execute(&cli).await.unwrap();
1669        assert!(result.contains("Sent input to session test-session"));
1670    }
1671
1672    #[tokio::test]
1673    async fn test_execute_input_connection_refused() {
1674        let cli = Cli {
1675            node: "localhost:1".into(),
1676            token: None,
1677            command: Commands::Input {
1678                name: "test".into(),
1679                text: Some("y".into()),
1680            },
1681        };
1682        let result = execute(&cli).await;
1683        let err = result.unwrap_err().to_string();
1684        assert!(err.contains("Could not connect to pulpod"));
1685    }
1686
1687    #[tokio::test]
1688    async fn test_execute_input_error_response() {
1689        use axum::{Router, http::StatusCode, routing::post};
1690
1691        let app = Router::new().route(
1692            "/api/v1/sessions/{id}/input",
1693            post(|| async {
1694                (
1695                    StatusCode::NOT_FOUND,
1696                    "{\"error\":\"session not found: ghost\"}",
1697                )
1698            }),
1699        );
1700        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1701        let addr = listener.local_addr().unwrap();
1702        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1703        let node = format!("127.0.0.1:{}", addr.port());
1704
1705        let cli = Cli {
1706            node,
1707            token: None,
1708            command: Commands::Input {
1709                name: "ghost".into(),
1710                text: Some("y".into()),
1711            },
1712        };
1713        let err = execute(&cli).await.unwrap_err();
1714        assert_eq!(err.to_string(), "session not found: ghost");
1715    }
1716
1717    #[tokio::test]
1718    async fn test_execute_ui() {
1719        let cli = Cli {
1720            node: "localhost:7433".into(),
1721            token: None,
1722            command: Commands::Ui,
1723        };
1724        let result = execute(&cli).await.unwrap();
1725        assert!(result.contains("Opening"));
1726        assert!(result.contains("http://localhost:7433"));
1727    }
1728
1729    #[tokio::test]
1730    async fn test_execute_ui_custom_node() {
1731        let cli = Cli {
1732            node: "mac-mini:7433".into(),
1733            token: None,
1734            command: Commands::Ui,
1735        };
1736        let result = execute(&cli).await.unwrap();
1737        assert!(result.contains("http://mac-mini:7433"));
1738    }
1739
1740    #[test]
1741    fn test_format_sessions_empty() {
1742        assert_eq!(format_sessions(&[]), "No sessions.");
1743    }
1744
1745    #[test]
1746    fn test_format_sessions_with_data() {
1747        use chrono::Utc;
1748        use pulpo_common::session::SessionStatus;
1749        use uuid::Uuid;
1750
1751        let sessions = vec![Session {
1752            id: Uuid::nil(),
1753            name: "my-api".into(),
1754            workdir: "/tmp/repo".into(),
1755            command: "claude -p 'Fix the bug'".into(),
1756            description: Some("Fix the bug".into()),
1757            status: SessionStatus::Active,
1758            exit_code: None,
1759            backend_session_id: None,
1760            output_snapshot: None,
1761            metadata: None,
1762            ink: None,
1763            intervention_code: None,
1764            intervention_reason: None,
1765            intervention_at: None,
1766            last_output_at: None,
1767            idle_since: None,
1768            created_at: Utc::now(),
1769            updated_at: Utc::now(),
1770        }];
1771        let output = format_sessions(&sessions);
1772        assert!(output.contains("NAME"));
1773        assert!(output.contains("COMMAND"));
1774        assert!(output.contains("my-api"));
1775        assert!(output.contains("active"));
1776        assert!(output.contains("claude -p 'Fix the bug'"));
1777    }
1778
1779    #[test]
1780    fn test_format_sessions_long_command_truncated() {
1781        use chrono::Utc;
1782        use pulpo_common::session::SessionStatus;
1783        use uuid::Uuid;
1784
1785        let sessions = vec![Session {
1786            id: Uuid::nil(),
1787            name: "test".into(),
1788            workdir: "/tmp".into(),
1789            command:
1790                "claude -p 'A very long command that exceeds fifty characters in total length here'"
1791                    .into(),
1792            description: None,
1793            status: SessionStatus::Ready,
1794            exit_code: None,
1795            backend_session_id: None,
1796            output_snapshot: None,
1797            metadata: None,
1798            ink: None,
1799            intervention_code: None,
1800            intervention_reason: None,
1801            intervention_at: None,
1802            last_output_at: None,
1803            idle_since: None,
1804            created_at: Utc::now(),
1805            updated_at: Utc::now(),
1806        }];
1807        let output = format_sessions(&sessions);
1808        assert!(output.contains("..."));
1809    }
1810
1811    #[test]
1812    fn test_format_nodes() {
1813        use pulpo_common::node::NodeInfo;
1814        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1815
1816        let resp = PeersResponse {
1817            local: NodeInfo {
1818                name: "mac-mini".into(),
1819                hostname: "h".into(),
1820                os: "macos".into(),
1821                arch: "arm64".into(),
1822                cpus: 8,
1823                memory_mb: 16384,
1824                gpu: None,
1825            },
1826            peers: vec![PeerInfo {
1827                name: "win-pc".into(),
1828                address: "win-pc:7433".into(),
1829                status: PeerStatus::Online,
1830                node_info: None,
1831                session_count: Some(3),
1832                source: PeerSource::Configured,
1833            }],
1834        };
1835        let output = format_nodes(&resp);
1836        assert!(output.contains("mac-mini"));
1837        assert!(output.contains("(local)"));
1838        assert!(output.contains("win-pc"));
1839        assert!(output.contains('3'));
1840    }
1841
1842    #[test]
1843    fn test_format_nodes_no_session_count() {
1844        use pulpo_common::node::NodeInfo;
1845        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1846
1847        let resp = PeersResponse {
1848            local: NodeInfo {
1849                name: "local".into(),
1850                hostname: "h".into(),
1851                os: "linux".into(),
1852                arch: "x86_64".into(),
1853                cpus: 4,
1854                memory_mb: 8192,
1855                gpu: None,
1856            },
1857            peers: vec![PeerInfo {
1858                name: "peer".into(),
1859                address: "peer:7433".into(),
1860                status: PeerStatus::Offline,
1861                node_info: None,
1862                session_count: None,
1863                source: PeerSource::Configured,
1864            }],
1865        };
1866        let output = format_nodes(&resp);
1867        assert!(output.contains("offline"));
1868        // No session count → shows "-"
1869        let lines: Vec<&str> = output.lines().collect();
1870        assert!(lines[2].contains('-'));
1871    }
1872
1873    #[tokio::test]
1874    async fn test_execute_resume_connection_refused() {
1875        let cli = Cli {
1876            node: "localhost:1".into(),
1877            token: None,
1878            command: Commands::Resume {
1879                name: "test".into(),
1880            },
1881        };
1882        let result = execute(&cli).await;
1883        let err = result.unwrap_err().to_string();
1884        assert!(err.contains("Could not connect to pulpod"));
1885    }
1886
1887    #[tokio::test]
1888    async fn test_execute_spawn_connection_refused() {
1889        let cli = Cli {
1890            node: "localhost:1".into(),
1891            token: None,
1892            command: Commands::Spawn {
1893                name: "test".into(),
1894                workdir: Some("/tmp".into()),
1895                ink: None,
1896                description: None,
1897                detach: true,
1898                command: vec!["test".into()],
1899            },
1900        };
1901        let result = execute(&cli).await;
1902        let err = result.unwrap_err().to_string();
1903        assert!(err.contains("Could not connect to pulpod"));
1904    }
1905
1906    #[tokio::test]
1907    async fn test_execute_kill_connection_refused() {
1908        let cli = Cli {
1909            node: "localhost:1".into(),
1910            token: None,
1911            command: Commands::Kill {
1912                name: "test".into(),
1913            },
1914        };
1915        let result = execute(&cli).await;
1916        let err = result.unwrap_err().to_string();
1917        assert!(err.contains("Could not connect to pulpod"));
1918    }
1919
1920    #[tokio::test]
1921    async fn test_execute_delete_connection_refused() {
1922        let cli = Cli {
1923            node: "localhost:1".into(),
1924            token: None,
1925            command: Commands::Delete {
1926                name: "test".into(),
1927            },
1928        };
1929        let result = execute(&cli).await;
1930        let err = result.unwrap_err().to_string();
1931        assert!(err.contains("Could not connect to pulpod"));
1932    }
1933
1934    #[tokio::test]
1935    async fn test_execute_logs_connection_refused() {
1936        let cli = Cli {
1937            node: "localhost:1".into(),
1938            token: None,
1939            command: Commands::Logs {
1940                name: "test".into(),
1941                lines: 50,
1942                follow: false,
1943            },
1944        };
1945        let result = execute(&cli).await;
1946        let err = result.unwrap_err().to_string();
1947        assert!(err.contains("Could not connect to pulpod"));
1948    }
1949
1950    #[tokio::test]
1951    async fn test_friendly_error_connect() {
1952        // Make a request to a closed port to get a connect error
1953        let err = reqwest::Client::new()
1954            .get("http://127.0.0.1:1")
1955            .send()
1956            .await
1957            .unwrap_err();
1958        let friendly = friendly_error(&err, "test-node:1");
1959        let msg = friendly.to_string();
1960        assert!(
1961            msg.contains("Could not connect"),
1962            "Expected connect message, got: {msg}"
1963        );
1964    }
1965
1966    #[tokio::test]
1967    async fn test_friendly_error_other() {
1968        // A request to an invalid URL creates a builder error, not a connect error
1969        let err = reqwest::Client::new()
1970            .get("http://[::invalid::url")
1971            .send()
1972            .await
1973            .unwrap_err();
1974        let friendly = friendly_error(&err, "bad-host");
1975        let msg = friendly.to_string();
1976        assert!(
1977            msg.contains("Network error"),
1978            "Expected network error message, got: {msg}"
1979        );
1980        assert!(msg.contains("bad-host"));
1981    }
1982
1983    // -- Auth helper tests --
1984
1985    #[test]
1986    fn test_is_localhost_variants() {
1987        assert!(is_localhost("localhost:7433"));
1988        assert!(is_localhost("127.0.0.1:7433"));
1989        assert!(is_localhost("[::1]:7433"));
1990        assert!(is_localhost("::1"));
1991        assert!(is_localhost("localhost"));
1992        assert!(!is_localhost("mac-mini:7433"));
1993        assert!(!is_localhost("192.168.1.100:7433"));
1994    }
1995
1996    #[test]
1997    fn test_authed_get_with_token() {
1998        let client = reqwest::Client::new();
1999        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2000            .build()
2001            .unwrap();
2002        let auth = req
2003            .headers()
2004            .get("authorization")
2005            .unwrap()
2006            .to_str()
2007            .unwrap();
2008        assert_eq!(auth, "Bearer tok");
2009    }
2010
2011    #[test]
2012    fn test_authed_get_without_token() {
2013        let client = reqwest::Client::new();
2014        let req = authed_get(&client, "http://h:1/api".into(), None)
2015            .build()
2016            .unwrap();
2017        assert!(req.headers().get("authorization").is_none());
2018    }
2019
2020    #[test]
2021    fn test_authed_post_with_token() {
2022        let client = reqwest::Client::new();
2023        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2024            .build()
2025            .unwrap();
2026        let auth = req
2027            .headers()
2028            .get("authorization")
2029            .unwrap()
2030            .to_str()
2031            .unwrap();
2032        assert_eq!(auth, "Bearer secret");
2033    }
2034
2035    #[test]
2036    fn test_authed_post_without_token() {
2037        let client = reqwest::Client::new();
2038        let req = authed_post(&client, "http://h:1/api".into(), None)
2039            .build()
2040            .unwrap();
2041        assert!(req.headers().get("authorization").is_none());
2042    }
2043
2044    #[test]
2045    fn test_authed_delete_with_token() {
2046        let client = reqwest::Client::new();
2047        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2048            .build()
2049            .unwrap();
2050        let auth = req
2051            .headers()
2052            .get("authorization")
2053            .unwrap()
2054            .to_str()
2055            .unwrap();
2056        assert_eq!(auth, "Bearer del-tok");
2057    }
2058
2059    #[test]
2060    fn test_authed_delete_without_token() {
2061        let client = reqwest::Client::new();
2062        let req = authed_delete(&client, "http://h:1/api".into(), None)
2063            .build()
2064            .unwrap();
2065        assert!(req.headers().get("authorization").is_none());
2066    }
2067
2068    #[tokio::test]
2069    async fn test_resolve_token_explicit() {
2070        let client = reqwest::Client::new();
2071        let token =
2072            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2073        assert_eq!(token, Some("my-tok".into()));
2074    }
2075
2076    #[tokio::test]
2077    async fn test_resolve_token_remote_no_explicit() {
2078        let client = reqwest::Client::new();
2079        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2080        assert_eq!(token, None);
2081    }
2082
2083    #[tokio::test]
2084    async fn test_resolve_token_localhost_auto_discover() {
2085        use axum::{Json, Router, routing::get};
2086
2087        let app = Router::new().route(
2088            "/api/v1/auth/token",
2089            get(|| async {
2090                Json(AuthTokenResponse {
2091                    token: "discovered".into(),
2092                })
2093            }),
2094        );
2095        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2096        let addr = listener.local_addr().unwrap();
2097        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2098
2099        let node = format!("localhost:{}", addr.port());
2100        let base = base_url(&node);
2101        let client = reqwest::Client::new();
2102        let token = resolve_token(&client, &base, &node, None).await;
2103        assert_eq!(token, Some("discovered".into()));
2104    }
2105
2106    #[tokio::test]
2107    async fn test_discover_token_empty_returns_none() {
2108        use axum::{Json, Router, routing::get};
2109
2110        let app = Router::new().route(
2111            "/api/v1/auth/token",
2112            get(|| async {
2113                Json(AuthTokenResponse {
2114                    token: String::new(),
2115                })
2116            }),
2117        );
2118        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2119        let addr = listener.local_addr().unwrap();
2120        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2121
2122        let base = format!("http://127.0.0.1:{}", addr.port());
2123        let client = reqwest::Client::new();
2124        assert_eq!(discover_token(&client, &base).await, None);
2125    }
2126
2127    #[tokio::test]
2128    async fn test_discover_token_unreachable_returns_none() {
2129        let client = reqwest::Client::new();
2130        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2131    }
2132
2133    #[test]
2134    fn test_cli_parse_with_token() {
2135        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2136        assert_eq!(cli.token, Some("my-secret".into()));
2137    }
2138
2139    #[test]
2140    fn test_cli_parse_without_token() {
2141        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2142        assert_eq!(cli.token, None);
2143    }
2144
2145    #[tokio::test]
2146    async fn test_execute_with_explicit_token_sends_header() {
2147        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2148
2149        let app = Router::new().route(
2150            "/api/v1/sessions",
2151            get(|req: Request| async move {
2152                let auth = req
2153                    .headers()
2154                    .get("authorization")
2155                    .and_then(|v| v.to_str().ok())
2156                    .unwrap_or("");
2157                assert_eq!(auth, "Bearer test-token");
2158                (StatusCode::OK, "[]".to_owned())
2159            }),
2160        );
2161        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2162        let addr = listener.local_addr().unwrap();
2163        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2164        let node = format!("127.0.0.1:{}", addr.port());
2165
2166        let cli = Cli {
2167            node,
2168            token: Some("test-token".into()),
2169            command: Commands::List,
2170        };
2171        let result = execute(&cli).await.unwrap();
2172        assert_eq!(result, "No sessions.");
2173    }
2174
2175    // -- Interventions tests --
2176
2177    #[test]
2178    fn test_cli_parse_interventions() {
2179        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2180        assert!(matches!(
2181            &cli.command,
2182            Commands::Interventions { name } if name == "my-session"
2183        ));
2184    }
2185
2186    #[test]
2187    fn test_format_interventions_empty() {
2188        assert_eq!(format_interventions(&[]), "No intervention events.");
2189    }
2190
2191    #[test]
2192    fn test_format_interventions_with_data() {
2193        let events = vec![
2194            InterventionEventResponse {
2195                id: 1,
2196                session_id: "sess-1".into(),
2197                code: None,
2198                reason: "Memory exceeded threshold".into(),
2199                created_at: "2026-01-01T00:00:00Z".into(),
2200            },
2201            InterventionEventResponse {
2202                id: 2,
2203                session_id: "sess-1".into(),
2204                code: None,
2205                reason: "Idle for 10 minutes".into(),
2206                created_at: "2026-01-02T00:00:00Z".into(),
2207            },
2208        ];
2209        let output = format_interventions(&events);
2210        assert!(output.contains("ID"));
2211        assert!(output.contains("TIMESTAMP"));
2212        assert!(output.contains("REASON"));
2213        assert!(output.contains("Memory exceeded threshold"));
2214        assert!(output.contains("Idle for 10 minutes"));
2215        assert!(output.contains("2026-01-01T00:00:00Z"));
2216    }
2217
2218    #[tokio::test]
2219    async fn test_execute_interventions_empty() {
2220        let node = start_test_server().await;
2221        let cli = Cli {
2222            node,
2223            token: None,
2224            command: Commands::Interventions {
2225                name: "my-session".into(),
2226            },
2227        };
2228        let result = execute(&cli).await.unwrap();
2229        assert_eq!(result, "No intervention events.");
2230    }
2231
2232    #[tokio::test]
2233    async fn test_execute_interventions_with_data() {
2234        use axum::{Router, routing::get};
2235
2236        let app = Router::new().route(
2237            "/api/v1/sessions/{id}/interventions",
2238            get(|| async {
2239                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2240                    .to_owned()
2241            }),
2242        );
2243        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2244        let addr = listener.local_addr().unwrap();
2245        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2246        let node = format!("127.0.0.1:{}", addr.port());
2247
2248        let cli = Cli {
2249            node,
2250            token: None,
2251            command: Commands::Interventions {
2252                name: "test".into(),
2253            },
2254        };
2255        let result = execute(&cli).await.unwrap();
2256        assert!(result.contains("OOM"));
2257        assert!(result.contains("2026-01-01T00:00:00Z"));
2258    }
2259
2260    #[tokio::test]
2261    async fn test_execute_interventions_connection_refused() {
2262        let cli = Cli {
2263            node: "localhost:1".into(),
2264            token: None,
2265            command: Commands::Interventions {
2266                name: "test".into(),
2267            },
2268        };
2269        let result = execute(&cli).await;
2270        let err = result.unwrap_err().to_string();
2271        assert!(err.contains("Could not connect to pulpod"));
2272    }
2273
2274    // -- Attach command tests --
2275
2276    #[test]
2277    fn test_build_attach_command() {
2278        let cmd = build_attach_command("my-session");
2279        assert_eq!(cmd.get_program(), "tmux");
2280        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2281        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2282    }
2283
2284    #[test]
2285    fn test_cli_parse_attach() {
2286        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2287        assert!(matches!(
2288            &cli.command,
2289            Commands::Attach { name } if name == "my-session"
2290        ));
2291    }
2292
2293    #[test]
2294    fn test_cli_parse_attach_alias() {
2295        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2296        assert!(matches!(
2297            &cli.command,
2298            Commands::Attach { name } if name == "my-session"
2299        ));
2300    }
2301
2302    #[tokio::test]
2303    async fn test_execute_attach_success() {
2304        let node = start_test_server().await;
2305        let cli = Cli {
2306            node,
2307            token: None,
2308            command: Commands::Attach {
2309                name: "test-session".into(),
2310            },
2311        };
2312        let result = execute(&cli).await.unwrap();
2313        assert!(result.contains("Detached from session test-session"));
2314    }
2315
2316    #[tokio::test]
2317    async fn test_execute_attach_with_backend_session_id() {
2318        use axum::{Router, routing::get};
2319        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2320        let app = Router::new().route(
2321            "/api/v1/sessions/{id}",
2322            get(move || async move { session_json.to_owned() }),
2323        );
2324        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2325        let addr = listener.local_addr().unwrap();
2326        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2327
2328        let cli = Cli {
2329            node: format!("127.0.0.1:{}", addr.port()),
2330            token: None,
2331            command: Commands::Attach {
2332                name: "my-session".into(),
2333            },
2334        };
2335        let result = execute(&cli).await.unwrap();
2336        assert!(result.contains("Detached from session my-session"));
2337    }
2338
2339    #[tokio::test]
2340    async fn test_execute_attach_connection_refused() {
2341        let cli = Cli {
2342            node: "localhost:1".into(),
2343            token: None,
2344            command: Commands::Attach {
2345                name: "test-session".into(),
2346            },
2347        };
2348        let result = execute(&cli).await;
2349        let err = result.unwrap_err().to_string();
2350        assert!(err.contains("Could not connect to pulpod"));
2351    }
2352
2353    #[tokio::test]
2354    async fn test_execute_attach_error_response() {
2355        use axum::{Router, http::StatusCode, routing::get};
2356        let app = Router::new().route(
2357            "/api/v1/sessions/{id}",
2358            get(|| async {
2359                (
2360                    StatusCode::NOT_FOUND,
2361                    r#"{"error":"session not found"}"#.to_owned(),
2362                )
2363            }),
2364        );
2365        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2366        let addr = listener.local_addr().unwrap();
2367        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2368
2369        let cli = Cli {
2370            node: format!("127.0.0.1:{}", addr.port()),
2371            token: None,
2372            command: Commands::Attach {
2373                name: "nonexistent".into(),
2374            },
2375        };
2376        let result = execute(&cli).await;
2377        let err = result.unwrap_err().to_string();
2378        assert!(err.contains("session not found"));
2379    }
2380
2381    #[tokio::test]
2382    async fn test_execute_attach_stale_session() {
2383        use axum::{Router, routing::get};
2384        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2385        let app = Router::new().route(
2386            "/api/v1/sessions/{id}",
2387            get(move || async move { session_json.to_owned() }),
2388        );
2389        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2390        let addr = listener.local_addr().unwrap();
2391        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2392
2393        let cli = Cli {
2394            node: format!("127.0.0.1:{}", addr.port()),
2395            token: None,
2396            command: Commands::Attach {
2397                name: "stale-sess".into(),
2398            },
2399        };
2400        let result = execute(&cli).await;
2401        let err = result.unwrap_err().to_string();
2402        assert!(err.contains("lost"));
2403        assert!(err.contains("pulpo resume"));
2404    }
2405
2406    #[tokio::test]
2407    async fn test_execute_attach_dead_session() {
2408        use axum::{Router, routing::get};
2409        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2410        let app = Router::new().route(
2411            "/api/v1/sessions/{id}",
2412            get(move || async move { session_json.to_owned() }),
2413        );
2414        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2415        let addr = listener.local_addr().unwrap();
2416        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2417
2418        let cli = Cli {
2419            node: format!("127.0.0.1:{}", addr.port()),
2420            token: None,
2421            command: Commands::Attach {
2422                name: "dead-sess".into(),
2423            },
2424        };
2425        let result = execute(&cli).await;
2426        let err = result.unwrap_err().to_string();
2427        assert!(err.contains("killed"));
2428        assert!(err.contains("cannot attach"));
2429    }
2430
2431    // -- Alias parse tests --
2432
2433    #[test]
2434    fn test_cli_parse_alias_spawn() {
2435        let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
2436        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2437    }
2438
2439    #[test]
2440    fn test_cli_parse_alias_list() {
2441        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2442        assert!(matches!(cli.command, Commands::List));
2443    }
2444
2445    #[test]
2446    fn test_cli_parse_alias_logs() {
2447        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2448        assert!(matches!(
2449            &cli.command,
2450            Commands::Logs { name, .. } if name == "my-session"
2451        ));
2452    }
2453
2454    #[test]
2455    fn test_cli_parse_alias_kill() {
2456        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2457        assert!(matches!(
2458            &cli.command,
2459            Commands::Kill { name } if name == "my-session"
2460        ));
2461    }
2462
2463    #[test]
2464    fn test_cli_parse_alias_delete() {
2465        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2466        assert!(matches!(
2467            &cli.command,
2468            Commands::Delete { name } if name == "my-session"
2469        ));
2470    }
2471
2472    #[test]
2473    fn test_cli_parse_alias_resume() {
2474        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2475        assert!(matches!(
2476            &cli.command,
2477            Commands::Resume { name } if name == "my-session"
2478        ));
2479    }
2480
2481    #[test]
2482    fn test_cli_parse_alias_nodes() {
2483        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2484        assert!(matches!(cli.command, Commands::Nodes));
2485    }
2486
2487    #[test]
2488    fn test_cli_parse_alias_interventions() {
2489        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2490        assert!(matches!(
2491            &cli.command,
2492            Commands::Interventions { name } if name == "my-session"
2493        ));
2494    }
2495
2496    #[test]
2497    fn test_api_error_json() {
2498        let err = api_error("{\"error\":\"session not found: foo\"}");
2499        assert_eq!(err.to_string(), "session not found: foo");
2500    }
2501
2502    #[test]
2503    fn test_api_error_plain_text() {
2504        let err = api_error("plain text error");
2505        assert_eq!(err.to_string(), "plain text error");
2506    }
2507
2508    // -- diff_output tests --
2509
2510    #[test]
2511    fn test_diff_output_empty_prev() {
2512        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2513    }
2514
2515    #[test]
2516    fn test_diff_output_identical() {
2517        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2518    }
2519
2520    #[test]
2521    fn test_diff_output_new_lines_appended() {
2522        let prev = "line1\nline2";
2523        let new = "line1\nline2\nline3\nline4";
2524        assert_eq!(diff_output(prev, new), "line3\nline4");
2525    }
2526
2527    #[test]
2528    fn test_diff_output_scrolled_window() {
2529        // Window of 3 lines: old lines scroll off top, new appear at bottom
2530        let prev = "line1\nline2\nline3";
2531        let new = "line2\nline3\nline4";
2532        assert_eq!(diff_output(prev, new), "line4");
2533    }
2534
2535    #[test]
2536    fn test_diff_output_completely_different() {
2537        let prev = "aaa\nbbb";
2538        let new = "xxx\nyyy";
2539        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2540    }
2541
2542    #[test]
2543    fn test_diff_output_last_line_matches_but_overlap_fails() {
2544        // Last line of prev appears in new but preceding lines don't match
2545        let prev = "aaa\ncommon";
2546        let new = "zzz\ncommon\nnew_line";
2547        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2548        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2549        // Falls through, no verified overlap, so returns everything
2550        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2551    }
2552
2553    #[test]
2554    fn test_diff_output_new_empty() {
2555        assert_eq!(diff_output("line1", ""), "");
2556    }
2557
2558    // -- follow_logs tests --
2559
2560    /// Start a test server that simulates evolving output and session status transitions.
2561    async fn start_follow_test_server() -> String {
2562        use axum::{Router, extract::Path, extract::Query, routing::get};
2563        use std::sync::Arc;
2564        use std::sync::atomic::{AtomicUsize, Ordering};
2565
2566        let call_count = Arc::new(AtomicUsize::new(0));
2567        let output_count = call_count.clone();
2568        let status_count = Arc::new(AtomicUsize::new(0));
2569        let status_count_inner = status_count.clone();
2570
2571        let app = Router::new()
2572            .route(
2573                "/api/v1/sessions/{id}/output",
2574                get(
2575                    move |_path: Path<String>,
2576                          _query: Query<std::collections::HashMap<String, String>>| {
2577                        let count = output_count.clone();
2578                        async move {
2579                            let n = count.fetch_add(1, Ordering::SeqCst);
2580                            let output = match n {
2581                                0 => "line1\nline2".to_owned(),
2582                                1 => "line1\nline2\nline3".to_owned(),
2583                                _ => "line2\nline3\nline4".to_owned(),
2584                            };
2585                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2586                        }
2587                    },
2588                ),
2589            )
2590            .route(
2591                "/api/v1/sessions/{id}",
2592                get(move |_path: Path<String>| {
2593                    let count = status_count_inner.clone();
2594                    async move {
2595                        let n = count.fetch_add(1, Ordering::SeqCst);
2596                        let status = if n < 2 { "active" } else { "ready" };
2597                        format!(
2598                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"{status}","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2599                        )
2600                    }
2601                }),
2602            );
2603
2604        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2605        let addr = listener.local_addr().unwrap();
2606        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2607        format!("http://127.0.0.1:{}", addr.port())
2608    }
2609
2610    #[tokio::test]
2611    async fn test_follow_logs_polls_and_exits_on_completed() {
2612        let base = start_follow_test_server().await;
2613        let client = reqwest::Client::new();
2614        let mut buf = Vec::new();
2615
2616        follow_logs(&client, &base, "test", 100, None, &mut buf)
2617            .await
2618            .unwrap();
2619
2620        let output = String::from_utf8(buf).unwrap();
2621        // Should contain initial output + new lines
2622        assert!(output.contains("line1"));
2623        assert!(output.contains("line2"));
2624        assert!(output.contains("line3"));
2625        assert!(output.contains("line4"));
2626    }
2627
2628    #[tokio::test]
2629    async fn test_execute_logs_follow_success() {
2630        let base = start_follow_test_server().await;
2631        // Extract host:port from http://127.0.0.1:PORT
2632        let node = base.strip_prefix("http://").unwrap().to_owned();
2633
2634        let cli = Cli {
2635            node,
2636            token: None,
2637            command: Commands::Logs {
2638                name: "test".into(),
2639                lines: 100,
2640                follow: true,
2641            },
2642        };
2643        // execute() with follow writes to stdout and returns empty string
2644        let result = execute(&cli).await.unwrap();
2645        assert_eq!(result, "");
2646    }
2647
2648    #[tokio::test]
2649    async fn test_execute_logs_follow_connection_refused() {
2650        let cli = Cli {
2651            node: "localhost:1".into(),
2652            token: None,
2653            command: Commands::Logs {
2654                name: "test".into(),
2655                lines: 50,
2656                follow: true,
2657            },
2658        };
2659        let result = execute(&cli).await;
2660        let err = result.unwrap_err().to_string();
2661        assert!(
2662            err.contains("Could not connect to pulpod"),
2663            "Expected friendly error, got: {err}"
2664        );
2665    }
2666
2667    #[tokio::test]
2668    async fn test_follow_logs_exits_on_dead() {
2669        use axum::{Router, extract::Path, extract::Query, routing::get};
2670
2671        let app = Router::new()
2672            .route(
2673                "/api/v1/sessions/{id}/output",
2674                get(
2675                    |_path: Path<String>,
2676                     _query: Query<std::collections::HashMap<String, String>>| async {
2677                        r#"{"output":"some output"}"#.to_owned()
2678                    },
2679                ),
2680            )
2681            .route(
2682                "/api/v1/sessions/{id}",
2683                get(|_path: Path<String>| async {
2684                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2685                }),
2686            );
2687
2688        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2689        let addr = listener.local_addr().unwrap();
2690        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2691        let base = format!("http://127.0.0.1:{}", addr.port());
2692
2693        let client = reqwest::Client::new();
2694        let mut buf = Vec::new();
2695        follow_logs(&client, &base, "test", 100, None, &mut buf)
2696            .await
2697            .unwrap();
2698
2699        let output = String::from_utf8(buf).unwrap();
2700        assert!(output.contains("some output"));
2701    }
2702
2703    #[tokio::test]
2704    async fn test_follow_logs_exits_on_stale() {
2705        use axum::{Router, extract::Path, extract::Query, routing::get};
2706
2707        let app = Router::new()
2708            .route(
2709                "/api/v1/sessions/{id}/output",
2710                get(
2711                    |_path: Path<String>,
2712                     _query: Query<std::collections::HashMap<String, String>>| async {
2713                        r#"{"output":"stale output"}"#.to_owned()
2714                    },
2715                ),
2716            )
2717            .route(
2718                "/api/v1/sessions/{id}",
2719                get(|_path: Path<String>| async {
2720                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2721                }),
2722            );
2723
2724        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2725        let addr = listener.local_addr().unwrap();
2726        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2727        let base = format!("http://127.0.0.1:{}", addr.port());
2728
2729        let client = reqwest::Client::new();
2730        let mut buf = Vec::new();
2731        follow_logs(&client, &base, "test", 100, None, &mut buf)
2732            .await
2733            .unwrap();
2734
2735        let output = String::from_utf8(buf).unwrap();
2736        assert!(output.contains("stale output"));
2737    }
2738
2739    #[tokio::test]
2740    async fn test_execute_logs_follow_non_reqwest_error() {
2741        use axum::{Router, extract::Path, extract::Query, routing::get};
2742
2743        // Session status endpoint returns invalid JSON to trigger a serde error
2744        let app = Router::new()
2745            .route(
2746                "/api/v1/sessions/{id}/output",
2747                get(
2748                    |_path: Path<String>,
2749                     _query: Query<std::collections::HashMap<String, String>>| async {
2750                        r#"{"output":"initial"}"#.to_owned()
2751                    },
2752                ),
2753            )
2754            .route(
2755                "/api/v1/sessions/{id}",
2756                get(|_path: Path<String>| async { "not valid json".to_owned() }),
2757            );
2758
2759        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2760        let addr = listener.local_addr().unwrap();
2761        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2762        let node = format!("127.0.0.1:{}", addr.port());
2763
2764        let cli = Cli {
2765            node,
2766            token: None,
2767            command: Commands::Logs {
2768                name: "test".into(),
2769                lines: 100,
2770                follow: true,
2771            },
2772        };
2773        let err = execute(&cli).await.unwrap_err();
2774        // serde_json error, not a reqwest error — hits the Err(other) branch
2775        let msg = err.to_string();
2776        assert!(
2777            msg.contains("expected ident"),
2778            "Expected serde parse error, got: {msg}"
2779        );
2780    }
2781
2782    #[tokio::test]
2783    async fn test_fetch_session_status_connection_error() {
2784        let client = reqwest::Client::new();
2785        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
2786        assert!(result.is_err());
2787    }
2788
2789    // -- Crontab wrapper tests --
2790
2791    #[test]
2792    fn test_build_crontab_line() {
2793        let line = build_crontab_line(
2794            "nightly-review",
2795            "0 3 * * *",
2796            "/home/me/repo",
2797            "claude -p 'Review PRs'",
2798            "localhost:7433",
2799        );
2800        assert_eq!(
2801            line,
2802            "0 3 * * * pulpo --node localhost:7433 spawn nightly-review --workdir /home/me/repo -- claude -p 'Review PRs' #pulpo:nightly-review\n"
2803        );
2804    }
2805
2806    #[test]
2807    fn test_crontab_install_success() {
2808        let crontab = "# existing cron\n0 * * * * echo hi\n";
2809        let line = "0 3 * * * pulpo --node n spawn my-job --workdir /tmp -- claude -p task #pulpo:my-job\n";
2810        let result = crontab_install(crontab, "my-job", line).unwrap();
2811        assert!(result.starts_with("# existing cron\n"));
2812        assert!(result.ends_with("#pulpo:my-job\n"));
2813        assert!(result.contains("echo hi"));
2814    }
2815
2816    #[test]
2817    fn test_crontab_install_duplicate_error() {
2818        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2819        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
2820        let err = crontab_install(crontab, "my-job", line).unwrap_err();
2821        assert!(err.to_string().contains("already exists"));
2822    }
2823
2824    #[test]
2825    fn test_crontab_list_empty() {
2826        assert_eq!(crontab_list(""), "No pulpo schedules.");
2827    }
2828
2829    #[test]
2830    fn test_crontab_list_no_pulpo_entries() {
2831        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
2832    }
2833
2834    #[test]
2835    fn test_crontab_list_with_entries() {
2836        let crontab = "0 3 * * * pulpo --node n spawn nightly --workdir /tmp -- claude -p task #pulpo:nightly\n";
2837        let output = crontab_list(crontab);
2838        assert!(output.contains("NAME"));
2839        assert!(output.contains("CRON"));
2840        assert!(output.contains("PAUSED"));
2841        assert!(output.contains("nightly"));
2842        assert!(output.contains("0 3 * * *"));
2843        assert!(output.contains("no"));
2844    }
2845
2846    #[test]
2847    fn test_crontab_list_paused_entry() {
2848        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
2849        let output = crontab_list(crontab);
2850        assert!(output.contains("paused-job"));
2851        assert!(output.contains("yes"));
2852    }
2853
2854    #[test]
2855    fn test_crontab_list_short_line() {
2856        // A line with fewer than 5 space-separated parts but still tagged
2857        let crontab = "badcron #pulpo:broken\n";
2858        let output = crontab_list(crontab);
2859        assert!(output.contains("broken"));
2860        assert!(output.contains('?'));
2861    }
2862
2863    #[test]
2864    fn test_crontab_remove_success() {
2865        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
2866        let result = crontab_remove(crontab, "my-job").unwrap();
2867        assert!(result.contains("echo hi"));
2868        assert!(!result.contains("my-job"));
2869    }
2870
2871    #[test]
2872    fn test_crontab_remove_not_found() {
2873        let crontab = "0 * * * * echo hi\n";
2874        let err = crontab_remove(crontab, "ghost").unwrap_err();
2875        assert!(err.to_string().contains("not found"));
2876    }
2877
2878    #[test]
2879    fn test_crontab_pause_success() {
2880        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2881        let result = crontab_pause(crontab, "my-job").unwrap();
2882        assert!(result.starts_with('#'));
2883        assert!(result.contains("#pulpo:my-job"));
2884    }
2885
2886    #[test]
2887    fn test_crontab_pause_not_found() {
2888        let crontab = "0 * * * * echo hi\n";
2889        let err = crontab_pause(crontab, "ghost").unwrap_err();
2890        assert!(err.to_string().contains("not found or already paused"));
2891    }
2892
2893    #[test]
2894    fn test_crontab_pause_already_paused() {
2895        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2896        let err = crontab_pause(crontab, "my-job").unwrap_err();
2897        assert!(err.to_string().contains("already paused"));
2898    }
2899
2900    #[test]
2901    fn test_crontab_resume_success() {
2902        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2903        let result = crontab_resume(crontab, "my-job").unwrap();
2904        assert!(!result.starts_with('#'));
2905        assert!(result.contains("#pulpo:my-job"));
2906    }
2907
2908    #[test]
2909    fn test_crontab_resume_not_found() {
2910        let crontab = "0 * * * * echo hi\n";
2911        let err = crontab_resume(crontab, "ghost").unwrap_err();
2912        assert!(err.to_string().contains("not found or not paused"));
2913    }
2914
2915    #[test]
2916    fn test_crontab_resume_not_paused() {
2917        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2918        let err = crontab_resume(crontab, "my-job").unwrap_err();
2919        assert!(err.to_string().contains("not paused"));
2920    }
2921
2922    // -- Schedule CLI parse tests --
2923
2924    #[test]
2925    fn test_cli_parse_schedule_install() {
2926        let cli = Cli::try_parse_from([
2927            "pulpo",
2928            "schedule",
2929            "install",
2930            "nightly",
2931            "0 3 * * *",
2932            "--workdir",
2933            "/tmp/repo",
2934            "--",
2935            "claude",
2936            "-p",
2937            "Review PRs",
2938        ])
2939        .unwrap();
2940        assert!(matches!(
2941            &cli.command,
2942            Commands::Schedule {
2943                action: ScheduleAction::Install { name, cron, workdir, command }
2944            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
2945              && command == &["claude", "-p", "Review PRs"]
2946        ));
2947    }
2948
2949    #[test]
2950    fn test_cli_parse_schedule_list() {
2951        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
2952        assert!(matches!(
2953            &cli.command,
2954            Commands::Schedule {
2955                action: ScheduleAction::List
2956            }
2957        ));
2958    }
2959
2960    #[test]
2961    fn test_cli_parse_schedule_remove() {
2962        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
2963        assert!(matches!(
2964            &cli.command,
2965            Commands::Schedule {
2966                action: ScheduleAction::Remove { name }
2967            } if name == "nightly"
2968        ));
2969    }
2970
2971    #[test]
2972    fn test_cli_parse_schedule_pause() {
2973        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
2974        assert!(matches!(
2975            &cli.command,
2976            Commands::Schedule {
2977                action: ScheduleAction::Pause { name }
2978            } if name == "nightly"
2979        ));
2980    }
2981
2982    #[test]
2983    fn test_cli_parse_schedule_resume() {
2984        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
2985        assert!(matches!(
2986            &cli.command,
2987            Commands::Schedule {
2988                action: ScheduleAction::Resume { name }
2989            } if name == "nightly"
2990        ));
2991    }
2992
2993    #[test]
2994    fn test_cli_parse_schedule_alias() {
2995        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
2996        assert!(matches!(
2997            &cli.command,
2998            Commands::Schedule {
2999                action: ScheduleAction::List
3000            }
3001        ));
3002    }
3003
3004    #[test]
3005    fn test_cli_parse_schedule_list_alias() {
3006        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3007        assert!(matches!(
3008            &cli.command,
3009            Commands::Schedule {
3010                action: ScheduleAction::List
3011            }
3012        ));
3013    }
3014
3015    #[test]
3016    fn test_cli_parse_schedule_remove_alias() {
3017        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3018        assert!(matches!(
3019            &cli.command,
3020            Commands::Schedule {
3021                action: ScheduleAction::Remove { name }
3022            } if name == "nightly"
3023        ));
3024    }
3025
3026    #[tokio::test]
3027    async fn test_execute_schedule_via_execute() {
3028        // Under coverage builds, execute_schedule is a stub returning Ok("")
3029        let node = start_test_server().await;
3030        let cli = Cli {
3031            node,
3032            token: None,
3033            command: Commands::Schedule {
3034                action: ScheduleAction::List,
3035            },
3036        };
3037        let result = execute(&cli).await;
3038        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3039        assert!(result.is_ok() || result.is_err());
3040    }
3041
3042    #[test]
3043    fn test_schedule_action_debug() {
3044        let action = ScheduleAction::List;
3045        assert_eq!(format!("{action:?}"), "List");
3046    }
3047}