Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5    KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version
15)]
16pub struct Cli {
17    /// Target node (default: localhost)
18    #[arg(long, default_value = "localhost:7433")]
19    pub node: String,
20
21    /// Auth token (auto-discovered from local daemon if omitted)
22    #[arg(long)]
23    pub token: Option<String>,
24
25    #[command(subcommand)]
26    pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32    /// Attach to a session's terminal
33    #[command(visible_alias = "a")]
34    Attach {
35        /// Session name or ID
36        name: String,
37    },
38
39    /// Send input to a session
40    #[command(visible_alias = "i")]
41    Input {
42        /// Session name or ID
43        name: String,
44        /// Text to send (sends Enter if omitted)
45        text: Option<String>,
46    },
47
48    /// Spawn a new agent session
49    #[command(visible_alias = "s")]
50    Spawn {
51        /// Working directory (defaults to current directory)
52        #[arg(long)]
53        workdir: Option<String>,
54
55        /// Session name (auto-generated if omitted)
56        #[arg(long)]
57        name: Option<String>,
58
59        /// Agent provider (claude, codex, gemini, opencode). Uses config `default_provider` or claude.
60        #[arg(long)]
61        provider: Option<String>,
62
63        /// Run in autonomous mode (fire-and-forget)
64        #[arg(long)]
65        auto: bool,
66
67        /// Disable all safety guardrails (Claude, Gemini)
68        #[arg(long)]
69        unrestricted: bool,
70
71        /// Model override, e.g. opus, sonnet (Claude, Codex, Gemini)
72        #[arg(long)]
73        model: Option<String>,
74
75        /// System prompt to append (Claude only)
76        #[arg(long)]
77        system_prompt: Option<String>,
78
79        /// Explicit allowed tools, comma-separated (Claude only)
80        #[arg(long, value_delimiter = ',')]
81        allowed_tools: Option<Vec<String>>,
82
83        /// Ink name (from config)
84        #[arg(long)]
85        ink: Option<String>,
86
87        /// Maximum agent turns before stopping (Claude only)
88        #[arg(long)]
89        max_turns: Option<u32>,
90
91        /// Maximum budget in USD before stopping (Claude only)
92        #[arg(long)]
93        max_budget: Option<f64>,
94
95        /// Output format, e.g. json, stream-json (Claude, Gemini, opencode)
96        #[arg(long)]
97        output_format: Option<String>,
98
99        /// Task prompt
100        prompt: Vec<String>,
101    },
102
103    /// List all sessions
104    #[command(visible_alias = "ls")]
105    List,
106
107    /// Show session logs/output
108    #[command(visible_alias = "l")]
109    Logs {
110        /// Session name or ID
111        name: String,
112
113        /// Number of lines to fetch
114        #[arg(long, default_value = "100")]
115        lines: usize,
116
117        /// Follow output (like `tail -f`)
118        #[arg(short, long)]
119        follow: bool,
120    },
121
122    /// Kill a session
123    #[command(visible_alias = "k")]
124    Kill {
125        /// Session name or ID
126        name: String,
127    },
128
129    /// Permanently remove a session from history
130    #[command(visible_alias = "rm")]
131    Delete {
132        /// Session name or ID
133        name: String,
134    },
135
136    /// Resume a stale session
137    #[command(visible_alias = "r")]
138    Resume {
139        /// Session name or ID
140        name: String,
141    },
142
143    /// List all known nodes
144    #[command(visible_alias = "n")]
145    Nodes,
146
147    /// Show intervention history for a session
148    #[command(visible_alias = "iv")]
149    Interventions {
150        /// Session name or ID
151        name: String,
152    },
153
154    /// Open the web dashboard in your browser
155    Ui,
156
157    /// Query extracted knowledge from past sessions
158    #[command(visible_alias = "kn")]
159    Knowledge {
160        /// Filter by session ID
161        #[arg(long)]
162        session: Option<String>,
163
164        /// Filter by kind (summary, failure)
165        #[arg(long)]
166        kind: Option<String>,
167
168        /// Filter by repo/workdir
169        #[arg(long)]
170        repo: Option<String>,
171
172        /// Filter by ink name
173        #[arg(long)]
174        ink: Option<String>,
175
176        /// Maximum results
177        #[arg(long, default_value = "20")]
178        limit: usize,
179
180        /// Context mode: find relevant knowledge for a workdir
181        #[arg(long)]
182        context: bool,
183
184        /// Get a single knowledge item by ID
185        #[arg(long)]
186        get: Option<String>,
187
188        /// Delete a knowledge item by ID
189        #[arg(long)]
190        delete: Option<String>,
191
192        /// Push local knowledge to configured remote
193        #[arg(long)]
194        push: bool,
195    },
196
197    /// Manage scheduled agent runs via crontab
198    #[command(visible_alias = "sched")]
199    Schedule {
200        #[command(subcommand)]
201        action: ScheduleAction,
202    },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207    /// Install a cron schedule that spawns a session
208    Install {
209        /// Schedule name
210        name: String,
211        /// Cron expression (e.g. "0 3 * * *")
212        cron: String,
213        /// Working directory
214        #[arg(long)]
215        workdir: String,
216        /// Agent provider
217        #[arg(long, default_value = "claude")]
218        provider: String,
219        /// Task prompt
220        prompt: Vec<String>,
221    },
222    /// List installed pulpo cron schedules
223    #[command(alias = "ls")]
224    List,
225    /// Remove a cron schedule
226    #[command(alias = "rm")]
227    Remove {
228        /// Schedule name
229        name: String,
230    },
231    /// Pause a cron schedule (comments out the line)
232    Pause {
233        /// Schedule name
234        name: String,
235    },
236    /// Resume a paused cron schedule (uncomments the line)
237    Resume {
238        /// Schedule name
239        name: String,
240    },
241}
242
243/// Format the base URL from the node address.
244pub fn base_url(node: &str) -> String {
245    format!("http://{node}")
246}
247
248/// Response shape for the output endpoint.
249#[derive(serde::Deserialize)]
250struct OutputResponse {
251    output: String,
252}
253
254/// Format a list of sessions as a table.
255fn format_sessions(sessions: &[Session]) -> String {
256    if sessions.is_empty() {
257        return "No sessions.".into();
258    }
259    let mut lines = vec![format!(
260        "{:<20} {:<12} {:<10} {:<14} {}",
261        "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262    )];
263    for s in sessions {
264        let prompt_display = if s.prompt.len() > 40 {
265            format!("{}...", &s.prompt[..37])
266        } else {
267            s.prompt.clone()
268        };
269        let status_display = if s.waiting_for_input {
270            "waiting".to_owned()
271        } else {
272            s.status.to_string()
273        };
274        lines.push(format!(
275            "{:<20} {:<12} {:<10} {:<14} {}",
276            s.name, status_display, s.provider, s.mode, prompt_display
277        ));
278    }
279    lines.join("\n")
280}
281
282/// Format the peers response as a table.
283fn format_nodes(resp: &PeersResponse) -> String {
284    let mut lines = vec![format!(
285        "{:<20} {:<25} {:<10} {}",
286        "NAME", "ADDRESS", "STATUS", "SESSIONS"
287    )];
288    lines.push(format!(
289        "{:<20} {:<25} {:<10} {}",
290        resp.local.name, "(local)", "online", "-"
291    ));
292    for p in &resp.peers {
293        let sessions = p
294            .session_count
295            .map_or_else(|| "-".into(), |c| c.to_string());
296        lines.push(format!(
297            "{:<20} {:<25} {:<10} {}",
298            p.name, p.address, p.status, sessions
299        ));
300    }
301    lines.join("\n")
302}
303
304/// Format intervention events as a table.
305fn format_interventions(events: &[InterventionEventResponse]) -> String {
306    if events.is_empty() {
307        return "No intervention events.".into();
308    }
309    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310    for e in events {
311        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312    }
313    lines.join("\n")
314}
315
316/// Format knowledge items as a table.
317fn format_knowledge(items: &[Knowledge]) -> String {
318    if items.is_empty() {
319        return "No knowledge found.".into();
320    }
321    let mut lines = vec![format!(
322        "{:<10} {:<40} {:<10} {:<6} {}",
323        "KIND", "TITLE", "REPO", "REL", "TAGS"
324    )];
325    for k in items {
326        let title = if k.title.len() > 38 {
327            format!("{}…", &k.title[..37])
328        } else {
329            k.title.clone()
330        };
331        let repo = k
332            .scope_repo
333            .as_deref()
334            .and_then(|r| r.rsplit('/').next())
335            .unwrap_or("-");
336        let tags = k.tags.join(",");
337        lines.push(format!(
338            "{:<10} {:<40} {:<10} {:<6.2} {}",
339            k.kind, title, repo, k.relevance, tags
340        ));
341    }
342    lines.join("\n")
343}
344
345/// Build the command to open a URL in the default browser.
346#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348    #[cfg(target_os = "macos")]
349    {
350        let mut cmd = std::process::Command::new("open");
351        cmd.arg(url);
352        cmd
353    }
354    #[cfg(target_os = "linux")]
355    {
356        let mut cmd = std::process::Command::new("xdg-open");
357        cmd.arg(url);
358        cmd
359    }
360    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361    {
362        // Fallback: try xdg-open
363        let mut cmd = std::process::Command::new("xdg-open");
364        cmd.arg(url);
365        cmd
366    }
367}
368
369/// Open a URL in the default browser.
370#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372    build_open_command(url).status()?;
373    Ok(())
374}
375
376/// Stub for coverage builds — avoids opening a browser during tests.
377#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379    Ok(())
380}
381
382/// Build the command to attach to a session's terminal.
383/// Takes the backend session ID (e.g. `my-session`) — the tmux session name.
384#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386    let mut cmd = std::process::Command::new("tmux");
387    cmd.args(["attach-session", "-t", backend_session_id]);
388    cmd
389}
390
391/// Attach to a session's terminal.
392#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394    let status = build_attach_command(backend_session_id).status()?;
395    if !status.success() {
396        anyhow::bail!("attach failed with {status}");
397    }
398    Ok(())
399}
400
401/// Stub for test and coverage builds — avoids spawning real terminals during tests.
402#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405    Ok(())
406}
407
408/// Extract a clean error message from an API JSON response (or fall back to raw text).
409fn api_error(text: &str) -> anyhow::Error {
410    serde_json::from_str::<serde_json::Value>(text)
411        .ok()
412        .and_then(|v| v["error"].as_str().map(String::from))
413        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416/// Return the response body text, or a clean error if the response was non-success.
417async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418    if resp.status().is_success() {
419        Ok(resp.text().await?)
420    } else {
421        let text = resp.text().await?;
422        Err(api_error(&text))
423    }
424}
425
426/// Map a reqwest error to a user-friendly message.
427fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428    if err.is_connect() {
429        anyhow::anyhow!(
430            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431        )
432    } else {
433        anyhow::anyhow!("Network error connecting to {node}: {err}")
434    }
435}
436
437/// Check if the node address points to localhost.
438fn is_localhost(node: &str) -> bool {
439    let host = node.split(':').next().unwrap_or(node);
440    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443/// Try to auto-discover the auth token from a local daemon.
444async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445    let resp = client
446        .get(format!("{base}/api/v1/auth/token"))
447        .send()
448        .await
449        .ok()?;
450    let body: AuthTokenResponse = resp.json().await.ok()?;
451    if body.token.is_empty() {
452        None
453    } else {
454        Some(body.token)
455    }
456}
457
458/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
459async fn resolve_token(
460    client: &reqwest::Client,
461    base: &str,
462    node: &str,
463    explicit: Option<&str>,
464) -> Option<String> {
465    if let Some(t) = explicit {
466        return Some(t.to_owned());
467    }
468    if is_localhost(node) {
469        return discover_token(client, base).await;
470    }
471    None
472}
473
474/// Build an authenticated GET request.
475fn authed_get(
476    client: &reqwest::Client,
477    url: String,
478    token: Option<&str>,
479) -> reqwest::RequestBuilder {
480    let req = client.get(url);
481    if let Some(t) = token {
482        req.bearer_auth(t)
483    } else {
484        req
485    }
486}
487
488/// Build an authenticated POST request.
489fn authed_post(
490    client: &reqwest::Client,
491    url: String,
492    token: Option<&str>,
493) -> reqwest::RequestBuilder {
494    let req = client.post(url);
495    if let Some(t) = token {
496        req.bearer_auth(t)
497    } else {
498        req
499    }
500}
501
502/// Build an authenticated DELETE request.
503fn authed_delete(
504    client: &reqwest::Client,
505    url: String,
506    token: Option<&str>,
507) -> reqwest::RequestBuilder {
508    let req = client.delete(url);
509    if let Some(t) = token {
510        req.bearer_auth(t)
511    } else {
512        req
513    }
514}
515
516/// Fetch session output from the API.
517async fn fetch_output(
518    client: &reqwest::Client,
519    base: &str,
520    name: &str,
521    lines: usize,
522    token: Option<&str>,
523) -> Result<String> {
524    let resp = authed_get(
525        client,
526        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527        token,
528    )
529    .send()
530    .await?;
531    let text = ok_or_api_error(resp).await?;
532    let output: OutputResponse = serde_json::from_str(&text)?;
533    Ok(output.output)
534}
535
536/// Fetch session status from the API.
537async fn fetch_session_status(
538    client: &reqwest::Client,
539    base: &str,
540    name: &str,
541    token: Option<&str>,
542) -> Result<String> {
543    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544        .send()
545        .await?;
546    let text = ok_or_api_error(resp).await?;
547    let session: Session = serde_json::from_str(&text)?;
548    Ok(session.status.to_string())
549}
550
551/// Compute the new trailing lines that differ from the previous output.
552///
553/// The output endpoint returns the last N lines from the terminal pane. As new lines
554/// appear, old lines at the top scroll off. We find the overlap between the end
555/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
556/// trailing lines.
557fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558    if prev.is_empty() {
559        return new;
560    }
561
562    let prev_lines: Vec<&str> = prev.lines().collect();
563    let new_lines: Vec<&str> = new.lines().collect();
564
565    if new_lines.is_empty() {
566        return "";
567    }
568
569    // prev is non-empty (early return above), so last() always succeeds
570    let last_prev = prev_lines[prev_lines.len() - 1];
571
572    // Find the last line of prev in new to determine the overlap boundary
573    for i in (0..new_lines.len()).rev() {
574        if new_lines[i] == last_prev {
575            // Verify contiguous overlap: check that lines before this match too
576            let overlap_len = prev_lines.len().min(i + 1);
577            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579            if prev_tail == new_overlap {
580                if i + 1 < new_lines.len() {
581                    // Return the slice of `new` after the overlap
582                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583                    return new.get(consumed.min(new.len())..).unwrap_or("");
584                }
585                return "";
586            }
587        }
588    }
589
590    // No overlap found — output changed completely, print it all
591    new
592}
593
594/// Follow logs by polling, printing only new output. Returns when the session ends.
595async fn follow_logs(
596    client: &reqwest::Client,
597    base: &str,
598    name: &str,
599    lines: usize,
600    token: Option<&str>,
601    writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604    write!(writer, "{prev_output}")?;
605
606    loop {
607        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609        // Check session status
610        let status = fetch_session_status(client, base, name, token).await?;
611        let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613        // Fetch latest output
614        let new_output = fetch_output(client, base, name, lines, token).await?;
615
616        let diff = diff_output(&prev_output, &new_output);
617        if !diff.is_empty() {
618            write!(writer, "{diff}")?;
619        }
620        prev_output = new_output;
621
622        if is_terminal {
623            break;
624        }
625    }
626    Ok(())
627}
628
629// --- Crontab wrapper ---
630
631#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634/// Read the current crontab. Returns empty string if no crontab exists.
635#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637    let output = std::process::Command::new("crontab").arg("-l").output()?;
638    if output.status.success() {
639        Ok(String::from_utf8_lossy(&output.stdout).to_string())
640    } else {
641        Ok(String::new())
642    }
643}
644
645/// Write the given content as the user's crontab.
646#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648    use std::io::Write;
649    let mut child = std::process::Command::new("crontab")
650        .arg("-")
651        .stdin(std::process::Stdio::piped())
652        .spawn()?;
653    child
654        .stdin
655        .as_mut()
656        .unwrap()
657        .write_all(content.as_bytes())?;
658    let status = child.wait()?;
659    if !status.success() {
660        anyhow::bail!("crontab write failed");
661    }
662    Ok(())
663}
664
665/// Build the crontab line for a pulpo schedule.
666#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668    name: &str,
669    cron: &str,
670    workdir: &str,
671    provider: &str,
672    prompt: &str,
673    node: &str,
674) -> String {
675    format!(
676        "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677    )
678}
679
680/// Install a cron schedule into a crontab string. Returns the updated crontab.
681#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683    let tag = format!("{CRONTAB_TAG}{name}");
684    if crontab.contains(&tag) {
685        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686    }
687    let mut result = crontab.to_owned();
688    result.push_str(line);
689    Ok(result)
690}
691
692/// Format pulpo crontab entries for display.
693#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695    let entries: Vec<&str> = crontab
696        .lines()
697        .filter(|l| l.contains(CRONTAB_TAG))
698        .collect();
699    if entries.is_empty() {
700        return "No pulpo schedules.".into();
701    }
702    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703    for entry in entries {
704        let paused = entry.starts_with('#');
705        let raw = entry.trim_start_matches('#').trim();
706        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708        let cron_expr = if parts.len() >= 5 {
709            parts[..5].join(" ")
710        } else {
711            "?".into()
712        };
713        lines.push(format!(
714            "{:<20} {:<15} {}",
715            name,
716            cron_expr,
717            if paused { "yes" } else { "no" }
718        ));
719    }
720    lines.join("\n")
721}
722
723/// Remove a schedule from a crontab string. Returns the updated crontab.
724#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726    use std::fmt::Write;
727    let tag = format!("{CRONTAB_TAG}{name}");
728    let filtered =
729        crontab
730            .lines()
731            .filter(|l| !l.contains(&tag))
732            .fold(String::new(), |mut acc, l| {
733                writeln!(acc, "{l}").unwrap();
734                acc
735            });
736    if filtered.len() == crontab.len() {
737        anyhow::bail!("schedule \"{name}\" not found");
738    }
739    Ok(filtered)
740}
741
742/// Pause (comment out) a schedule in a crontab string.
743#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745    use std::fmt::Write;
746    let tag = format!("{CRONTAB_TAG}{name}");
747    let mut found = false;
748    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749        if l.contains(&tag) && !l.starts_with('#') {
750            found = true;
751            writeln!(acc, "#{l}").unwrap();
752        } else {
753            writeln!(acc, "{l}").unwrap();
754        }
755        acc
756    });
757    if !found {
758        anyhow::bail!("schedule \"{name}\" not found or already paused");
759    }
760    Ok(updated)
761}
762
763/// Resume (uncomment) a schedule in a crontab string.
764#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766    use std::fmt::Write;
767    let tag = format!("{CRONTAB_TAG}{name}");
768    let mut found = false;
769    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770        if l.contains(&tag) && l.starts_with('#') {
771            found = true;
772            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773        } else {
774            writeln!(acc, "{l}").unwrap();
775        }
776        acc
777    });
778    if !found {
779        anyhow::bail!("schedule \"{name}\" not found or not paused");
780    }
781    Ok(updated)
782}
783
784/// Execute a schedule subcommand using the crontab wrapper.
785#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787    match action {
788        ScheduleAction::Install {
789            name,
790            cron,
791            workdir,
792            provider,
793            prompt,
794        } => {
795            let crontab = read_crontab()?;
796            let joined_prompt = prompt.join(" ");
797            let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798            let updated = crontab_install(&crontab, name, &line)?;
799            write_crontab(&updated)?;
800            Ok(format!("Installed schedule \"{name}\""))
801        }
802        ScheduleAction::List => {
803            let crontab = read_crontab()?;
804            Ok(crontab_list(&crontab))
805        }
806        ScheduleAction::Remove { name } => {
807            let crontab = read_crontab()?;
808            let updated = crontab_remove(&crontab, name)?;
809            write_crontab(&updated)?;
810            Ok(format!("Removed schedule \"{name}\""))
811        }
812        ScheduleAction::Pause { name } => {
813            let crontab = read_crontab()?;
814            let updated = crontab_pause(&crontab, name)?;
815            write_crontab(&updated)?;
816            Ok(format!("Paused schedule \"{name}\""))
817        }
818        ScheduleAction::Resume { name } => {
819            let crontab = read_crontab()?;
820            let updated = crontab_resume(&crontab, name)?;
821            write_crontab(&updated)?;
822            Ok(format!("Resumed schedule \"{name}\""))
823        }
824    }
825}
826
827/// Stub for coverage builds — crontab is real I/O.
828#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830    Ok(String::new())
831}
832
833/// Execute the given CLI command against the specified node.
834#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836    let url = base_url(&cli.node);
837    let client = reqwest::Client::new();
838    let node = &cli.node;
839    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841    match &cli.command {
842        Commands::Attach { name } => {
843            // Fetch session to get actual backend_session_id
844            let resp = authed_get(
845                &client,
846                format!("{url}/api/v1/sessions/{name}"),
847                token.as_deref(),
848            )
849            .send()
850            .await
851            .map_err(|e| friendly_error(&e, node))?;
852            let text = ok_or_api_error(resp).await?;
853            let session: Session = serde_json::from_str(&text)?;
854            let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
855            attach_session(&backend_id)?;
856            Ok(format!("Detached from session {name}."))
857        }
858        Commands::Input { name, text } => {
859            let input_text = text.as_deref().unwrap_or("\n");
860            let body = serde_json::json!({ "text": input_text });
861            let resp = authed_post(
862                &client,
863                format!("{url}/api/v1/sessions/{name}/input"),
864                token.as_deref(),
865            )
866            .json(&body)
867            .send()
868            .await
869            .map_err(|e| friendly_error(&e, node))?;
870            ok_or_api_error(resp).await?;
871            Ok(format!("Sent input to session {name}."))
872        }
873        Commands::List => {
874            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
875                .send()
876                .await
877                .map_err(|e| friendly_error(&e, node))?;
878            let text = ok_or_api_error(resp).await?;
879            let sessions: Vec<Session> = serde_json::from_str(&text)?;
880            Ok(format_sessions(&sessions))
881        }
882        Commands::Nodes => {
883            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
884                .send()
885                .await
886                .map_err(|e| friendly_error(&e, node))?;
887            let text = ok_or_api_error(resp).await?;
888            let resp: PeersResponse = serde_json::from_str(&text)?;
889            Ok(format_nodes(&resp))
890        }
891        Commands::Spawn {
892            workdir,
893            name,
894            provider,
895            auto,
896            unrestricted,
897            model,
898            system_prompt,
899            allowed_tools,
900            ink,
901            max_turns,
902            max_budget,
903            output_format,
904            prompt,
905        } => {
906            let prompt_text = prompt.join(" ");
907            let mode = if *auto { "autonomous" } else { "interactive" };
908            // Resolve workdir: --workdir flag > current directory
909            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
910                std::env::current_dir()
911                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
912            });
913            let mut body = serde_json::json!({
914                "workdir": resolved_workdir,
915                "mode": mode,
916            });
917            // Only include prompt if non-empty
918            if !prompt_text.is_empty() {
919                body["prompt"] = serde_json::json!(prompt_text);
920            }
921            // Only include provider if explicitly specified
922            if let Some(p) = provider {
923                body["provider"] = serde_json::json!(p);
924            }
925            if *unrestricted {
926                body["unrestricted"] = serde_json::json!(true);
927            }
928            if let Some(n) = name {
929                body["name"] = serde_json::json!(n);
930            }
931            if let Some(m) = model {
932                body["model"] = serde_json::json!(m);
933            }
934            if let Some(sp) = system_prompt {
935                body["system_prompt"] = serde_json::json!(sp);
936            }
937            if let Some(tools) = allowed_tools {
938                body["allowed_tools"] = serde_json::json!(tools);
939            }
940            if let Some(p) = ink {
941                body["ink"] = serde_json::json!(p);
942            }
943            if let Some(mt) = max_turns {
944                body["max_turns"] = serde_json::json!(mt);
945            }
946            if let Some(mb) = max_budget {
947                body["max_budget_usd"] = serde_json::json!(mb);
948            }
949            if let Some(of) = output_format {
950                body["output_format"] = serde_json::json!(of);
951            }
952            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
953                .json(&body)
954                .send()
955                .await
956                .map_err(|e| friendly_error(&e, node))?;
957            let text = ok_or_api_error(resp).await?;
958            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
959            let mut msg = format!(
960                "Created session \"{}\" ({})",
961                resp.session.name, resp.session.id
962            );
963            for w in &resp.warnings {
964                use std::fmt::Write;
965                let _ = write!(msg, "\n  Warning: {w}");
966            }
967            Ok(msg)
968        }
969        Commands::Kill { name } => {
970            let resp = authed_post(
971                &client,
972                format!("{url}/api/v1/sessions/{name}/kill"),
973                token.as_deref(),
974            )
975            .send()
976            .await
977            .map_err(|e| friendly_error(&e, node))?;
978            ok_or_api_error(resp).await?;
979            Ok(format!("Session {name} killed."))
980        }
981        Commands::Delete { name } => {
982            let resp = authed_delete(
983                &client,
984                format!("{url}/api/v1/sessions/{name}"),
985                token.as_deref(),
986            )
987            .send()
988            .await
989            .map_err(|e| friendly_error(&e, node))?;
990            ok_or_api_error(resp).await?;
991            Ok(format!("Session {name} deleted."))
992        }
993        Commands::Logs {
994            name,
995            lines,
996            follow,
997        } => {
998            if *follow {
999                let mut stdout = std::io::stdout();
1000                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1001                    .await
1002                    .map_err(|e| {
1003                        // Unwrap reqwest errors to friendly messages
1004                        match e.downcast::<reqwest::Error>() {
1005                            Ok(re) => friendly_error(&re, node),
1006                            Err(other) => other,
1007                        }
1008                    })?;
1009                Ok(String::new())
1010            } else {
1011                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1012                    .await
1013                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1014                        Ok(re) => friendly_error(&re, node),
1015                        Err(other) => other,
1016                    })?;
1017                Ok(output)
1018            }
1019        }
1020        Commands::Interventions { name } => {
1021            let resp = authed_get(
1022                &client,
1023                format!("{url}/api/v1/sessions/{name}/interventions"),
1024                token.as_deref(),
1025            )
1026            .send()
1027            .await
1028            .map_err(|e| friendly_error(&e, node))?;
1029            let text = ok_or_api_error(resp).await?;
1030            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1031            Ok(format_interventions(&events))
1032        }
1033        Commands::Ui => {
1034            let dashboard = base_url(&cli.node);
1035            open_browser(&dashboard)?;
1036            Ok(format!("Opening {dashboard}"))
1037        }
1038        Commands::Resume { name } => {
1039            let resp = authed_post(
1040                &client,
1041                format!("{url}/api/v1/sessions/{name}/resume"),
1042                token.as_deref(),
1043            )
1044            .send()
1045            .await
1046            .map_err(|e| friendly_error(&e, node))?;
1047            let text = ok_or_api_error(resp).await?;
1048            let session: Session = serde_json::from_str(&text)?;
1049            Ok(format!("Resumed session \"{}\"", session.name))
1050        }
1051        Commands::Knowledge {
1052            session,
1053            kind,
1054            repo,
1055            ink,
1056            limit,
1057            context,
1058            get,
1059            delete,
1060            push,
1061        } => {
1062            // Single-item get
1063            if let Some(id) = get {
1064                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1065                let resp = authed_get(&client, endpoint, token.as_deref())
1066                    .send()
1067                    .await
1068                    .map_err(|e| friendly_error(&e, node))?;
1069                let text = ok_or_api_error(resp).await?;
1070                let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1071                return Ok(format_knowledge(&[resp.knowledge]));
1072            }
1073
1074            // Delete by ID
1075            if let Some(id) = delete {
1076                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1077                let resp = authed_delete(&client, endpoint, token.as_deref())
1078                    .send()
1079                    .await
1080                    .map_err(|e| friendly_error(&e, node))?;
1081                let text = ok_or_api_error(resp).await?;
1082                let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1083                return Ok(if resp.deleted {
1084                    format!("Deleted knowledge item {id}")
1085                } else {
1086                    format!("Knowledge item {id} not found")
1087                });
1088            }
1089
1090            // Push to remote
1091            if *push {
1092                let endpoint = format!("{url}/api/v1/knowledge/push");
1093                let resp = authed_post(&client, endpoint, token.as_deref())
1094                    .send()
1095                    .await
1096                    .map_err(|e| friendly_error(&e, node))?;
1097                let text = ok_or_api_error(resp).await?;
1098                let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1099                return Ok(resp.message);
1100            }
1101
1102            // List / context query
1103            let mut params = vec![format!("limit={limit}")];
1104            let endpoint = if *context {
1105                if let Some(r) = repo {
1106                    params.push(format!("workdir={r}"));
1107                }
1108                if let Some(i) = ink {
1109                    params.push(format!("ink={i}"));
1110                }
1111                format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1112            } else {
1113                if let Some(s) = session {
1114                    params.push(format!("session_id={s}"));
1115                }
1116                if let Some(k) = kind {
1117                    params.push(format!("kind={k}"));
1118                }
1119                if let Some(r) = repo {
1120                    params.push(format!("repo={r}"));
1121                }
1122                if let Some(i) = ink {
1123                    params.push(format!("ink={i}"));
1124                }
1125                format!("{url}/api/v1/knowledge?{}", params.join("&"))
1126            };
1127            let resp = authed_get(&client, endpoint, token.as_deref())
1128                .send()
1129                .await
1130                .map_err(|e| friendly_error(&e, node))?;
1131            let text = ok_or_api_error(resp).await?;
1132            let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1133            Ok(format_knowledge(&resp.knowledge))
1134        }
1135        Commands::Schedule { action } => execute_schedule(action, node),
1136    }
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use super::*;
1142
1143    #[test]
1144    fn test_base_url() {
1145        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1146        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1147    }
1148
1149    #[test]
1150    fn test_cli_parse_list() {
1151        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1152        assert_eq!(cli.node, "localhost:7433");
1153        assert!(matches!(cli.command, Commands::List));
1154    }
1155
1156    #[test]
1157    fn test_cli_parse_nodes() {
1158        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1159        assert!(matches!(cli.command, Commands::Nodes));
1160    }
1161
1162    #[test]
1163    fn test_cli_parse_ui() {
1164        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1165        assert!(matches!(cli.command, Commands::Ui));
1166    }
1167
1168    #[test]
1169    fn test_cli_parse_ui_custom_node() {
1170        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1171        assert!(matches!(cli.command, Commands::Ui));
1172        assert_eq!(cli.node, "mac-mini:7433");
1173    }
1174
1175    #[test]
1176    fn test_build_open_command() {
1177        let cmd = build_open_command("http://localhost:7433");
1178        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1179        assert_eq!(args, vec!["http://localhost:7433"]);
1180        #[cfg(target_os = "macos")]
1181        assert_eq!(cmd.get_program(), "open");
1182        #[cfg(target_os = "linux")]
1183        assert_eq!(cmd.get_program(), "xdg-open");
1184    }
1185
1186    #[test]
1187    fn test_cli_parse_spawn() {
1188        let cli = Cli::try_parse_from([
1189            "pulpo",
1190            "spawn",
1191            "--workdir",
1192            "/tmp/repo",
1193            "Fix",
1194            "the",
1195            "bug",
1196        ])
1197        .unwrap();
1198        assert!(matches!(
1199            &cli.command,
1200            Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1201                if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1202                && !unrestricted && prompt == &["Fix", "the", "bug"]
1203        ));
1204    }
1205
1206    #[test]
1207    fn test_cli_parse_spawn_with_provider() {
1208        let cli = Cli::try_parse_from([
1209            "pulpo",
1210            "spawn",
1211            "--workdir",
1212            "/tmp",
1213            "--provider",
1214            "codex",
1215            "Do it",
1216        ])
1217        .unwrap();
1218        assert!(matches!(
1219            &cli.command,
1220            Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1221        ));
1222    }
1223
1224    #[test]
1225    fn test_cli_parse_spawn_auto() {
1226        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1227            .unwrap();
1228        assert!(matches!(
1229            &cli.command,
1230            Commands::Spawn { auto, .. } if *auto
1231        ));
1232    }
1233
1234    #[test]
1235    fn test_cli_parse_spawn_unrestricted() {
1236        let cli = Cli::try_parse_from([
1237            "pulpo",
1238            "spawn",
1239            "--workdir",
1240            "/tmp",
1241            "--unrestricted",
1242            "Do it",
1243        ])
1244        .unwrap();
1245        assert!(matches!(
1246            &cli.command,
1247            Commands::Spawn { unrestricted, .. } if *unrestricted
1248        ));
1249    }
1250
1251    #[test]
1252    fn test_cli_parse_spawn_unrestricted_default() {
1253        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1254        assert!(matches!(
1255            &cli.command,
1256            Commands::Spawn { unrestricted, .. } if !unrestricted
1257        ));
1258    }
1259
1260    #[test]
1261    fn test_cli_parse_spawn_with_name() {
1262        let cli = Cli::try_parse_from([
1263            "pulpo",
1264            "spawn",
1265            "--workdir",
1266            "/tmp/repo",
1267            "--name",
1268            "my-task",
1269            "Fix it",
1270        ])
1271        .unwrap();
1272        assert!(matches!(
1273            &cli.command,
1274            Commands::Spawn { workdir, name, .. }
1275                if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1276        ));
1277    }
1278
1279    #[test]
1280    fn test_cli_parse_spawn_without_name() {
1281        let cli =
1282            Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1283        assert!(matches!(
1284            &cli.command,
1285            Commands::Spawn { name, .. } if name.is_none()
1286        ));
1287    }
1288
1289    #[test]
1290    fn test_cli_parse_logs() {
1291        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1292        assert!(matches!(
1293            &cli.command,
1294            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1295        ));
1296    }
1297
1298    #[test]
1299    fn test_cli_parse_logs_with_lines() {
1300        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1301        assert!(matches!(
1302            &cli.command,
1303            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1304        ));
1305    }
1306
1307    #[test]
1308    fn test_cli_parse_logs_follow() {
1309        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1310        assert!(matches!(
1311            &cli.command,
1312            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1313        ));
1314    }
1315
1316    #[test]
1317    fn test_cli_parse_logs_follow_short() {
1318        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1319        assert!(matches!(
1320            &cli.command,
1321            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1322        ));
1323    }
1324
1325    #[test]
1326    fn test_cli_parse_kill() {
1327        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1328        assert!(matches!(
1329            &cli.command,
1330            Commands::Kill { name } if name == "my-session"
1331        ));
1332    }
1333
1334    #[test]
1335    fn test_cli_parse_delete() {
1336        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1337        assert!(matches!(
1338            &cli.command,
1339            Commands::Delete { name } if name == "my-session"
1340        ));
1341    }
1342
1343    #[test]
1344    fn test_cli_parse_resume() {
1345        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1346        assert!(matches!(
1347            &cli.command,
1348            Commands::Resume { name } if name == "my-session"
1349        ));
1350    }
1351
1352    #[test]
1353    fn test_cli_parse_input() {
1354        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1355        assert!(matches!(
1356            &cli.command,
1357            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1358        ));
1359    }
1360
1361    #[test]
1362    fn test_cli_parse_input_no_text() {
1363        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1364        assert!(matches!(
1365            &cli.command,
1366            Commands::Input { name, text } if name == "my-session" && text.is_none()
1367        ));
1368    }
1369
1370    #[test]
1371    fn test_cli_parse_input_alias() {
1372        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1373        assert!(matches!(
1374            &cli.command,
1375            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1376        ));
1377    }
1378
1379    #[test]
1380    fn test_cli_parse_custom_node() {
1381        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1382        assert_eq!(cli.node, "win-pc:8080");
1383    }
1384
1385    #[test]
1386    fn test_cli_version() {
1387        let result = Cli::try_parse_from(["pulpo", "--version"]);
1388        // clap exits with an error (kind DisplayVersion) when --version is used
1389        let err = result.unwrap_err();
1390        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1391    }
1392
1393    #[test]
1394    fn test_cli_parse_no_subcommand_fails() {
1395        let result = Cli::try_parse_from(["pulpo"]);
1396        assert!(result.is_err());
1397    }
1398
1399    #[test]
1400    fn test_cli_debug() {
1401        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1402        let debug = format!("{cli:?}");
1403        assert!(debug.contains("List"));
1404    }
1405
1406    #[test]
1407    fn test_commands_debug() {
1408        let cmd = Commands::List;
1409        assert_eq!(format!("{cmd:?}"), "List");
1410    }
1411
1412    /// A valid Session JSON for test responses.
1413    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1414
1415    /// A valid `CreateSessionResponse` JSON wrapping the session.
1416    fn test_create_response_json() -> String {
1417        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1418    }
1419
1420    /// Start a lightweight test HTTP server and return its address.
1421    async fn start_test_server() -> String {
1422        use axum::http::StatusCode;
1423        use axum::{
1424            Json, Router,
1425            routing::{get, post},
1426        };
1427
1428        let create_json = test_create_response_json();
1429
1430        let app = Router::new()
1431            .route(
1432                "/api/v1/sessions",
1433                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1434                    (StatusCode::CREATED, create_json.clone())
1435                }),
1436            )
1437            .route(
1438                "/api/v1/sessions/{id}",
1439                get(|| async { TEST_SESSION_JSON.to_owned() })
1440                    .delete(|| async { StatusCode::NO_CONTENT }),
1441            )
1442            .route(
1443                "/api/v1/sessions/{id}/kill",
1444                post(|| async { StatusCode::NO_CONTENT }),
1445            )
1446            .route(
1447                "/api/v1/sessions/{id}/output",
1448                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1449            )
1450            .route(
1451                "/api/v1/peers",
1452                get(|| async {
1453                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1454                }),
1455            )
1456            .route(
1457                "/api/v1/sessions/{id}/resume",
1458                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1459            )
1460            .route(
1461                "/api/v1/sessions/{id}/interventions",
1462                get(|| async { "[]".to_owned() }),
1463            )
1464            .route(
1465                "/api/v1/sessions/{id}/input",
1466                post(|| async { StatusCode::NO_CONTENT }),
1467            );
1468
1469        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1470        let addr = listener.local_addr().unwrap();
1471        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1472        format!("127.0.0.1:{}", addr.port())
1473    }
1474
1475    #[tokio::test]
1476    async fn test_execute_list_success() {
1477        let node = start_test_server().await;
1478        let cli = Cli {
1479            node,
1480            token: None,
1481            command: Commands::List,
1482        };
1483        let result = execute(&cli).await.unwrap();
1484        assert_eq!(result, "No sessions.");
1485    }
1486
1487    #[tokio::test]
1488    async fn test_execute_nodes_success() {
1489        let node = start_test_server().await;
1490        let cli = Cli {
1491            node,
1492            token: None,
1493            command: Commands::Nodes,
1494        };
1495        let result = execute(&cli).await.unwrap();
1496        assert!(result.contains("test"));
1497        assert!(result.contains("(local)"));
1498        assert!(result.contains("NAME"));
1499    }
1500
1501    #[tokio::test]
1502    async fn test_execute_spawn_success() {
1503        let node = start_test_server().await;
1504        let cli = Cli {
1505            node,
1506            token: None,
1507            command: Commands::Spawn {
1508                workdir: Some("/tmp/repo".into()),
1509                name: None,
1510                provider: Some("claude".into()),
1511                auto: false,
1512                unrestricted: false,
1513                model: None,
1514                system_prompt: None,
1515                allowed_tools: None,
1516                ink: None,
1517                max_turns: None,
1518                max_budget: None,
1519                output_format: None,
1520                prompt: vec!["Fix".into(), "bug".into()],
1521            },
1522        };
1523        let result = execute(&cli).await.unwrap();
1524        assert!(result.contains("Created session"));
1525        assert!(result.contains("repo"));
1526    }
1527
1528    #[tokio::test]
1529    async fn test_execute_spawn_with_all_new_flags() {
1530        let node = start_test_server().await;
1531        let cli = Cli {
1532            node,
1533            token: None,
1534            command: Commands::Spawn {
1535                workdir: Some("/tmp/repo".into()),
1536                name: None,
1537                provider: Some("claude".into()),
1538                auto: false,
1539                unrestricted: false,
1540                model: Some("opus".into()),
1541                system_prompt: Some("Be helpful".into()),
1542                allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1543                ink: Some("coder".into()),
1544                max_turns: Some(5),
1545                max_budget: Some(2.5),
1546                output_format: Some("json".into()),
1547                prompt: vec!["Fix".into(), "bug".into()],
1548            },
1549        };
1550        let result = execute(&cli).await.unwrap();
1551        assert!(result.contains("Created session"));
1552    }
1553
1554    #[tokio::test]
1555    async fn test_execute_spawn_auto_mode() {
1556        let node = start_test_server().await;
1557        let cli = Cli {
1558            node,
1559            token: None,
1560            command: Commands::Spawn {
1561                workdir: Some("/tmp/repo".into()),
1562                name: None,
1563                provider: Some("claude".into()),
1564                auto: true,
1565                unrestricted: false,
1566                model: None,
1567                system_prompt: None,
1568                allowed_tools: None,
1569                ink: None,
1570                max_turns: None,
1571                max_budget: None,
1572                output_format: None,
1573                prompt: vec!["Do it".into()],
1574            },
1575        };
1576        let result = execute(&cli).await.unwrap();
1577        assert!(result.contains("Created session"));
1578    }
1579
1580    #[tokio::test]
1581    async fn test_execute_spawn_with_name() {
1582        let node = start_test_server().await;
1583        let cli = Cli {
1584            node,
1585            token: None,
1586            command: Commands::Spawn {
1587                workdir: Some("/tmp/repo".into()),
1588                name: Some("my-task".into()),
1589                provider: Some("claude".into()),
1590                auto: false,
1591                unrestricted: false,
1592                model: None,
1593                system_prompt: None,
1594                allowed_tools: None,
1595                ink: None,
1596                max_turns: None,
1597                max_budget: None,
1598                output_format: None,
1599                prompt: vec!["Fix".into(), "bug".into()],
1600            },
1601        };
1602        let result = execute(&cli).await.unwrap();
1603        assert!(result.contains("Created session"));
1604    }
1605
1606    #[tokio::test]
1607    async fn test_execute_kill_success() {
1608        let node = start_test_server().await;
1609        let cli = Cli {
1610            node,
1611            token: None,
1612            command: Commands::Kill {
1613                name: "test-session".into(),
1614            },
1615        };
1616        let result = execute(&cli).await.unwrap();
1617        assert!(result.contains("killed"));
1618    }
1619
1620    #[tokio::test]
1621    async fn test_execute_delete_success() {
1622        let node = start_test_server().await;
1623        let cli = Cli {
1624            node,
1625            token: None,
1626            command: Commands::Delete {
1627                name: "test-session".into(),
1628            },
1629        };
1630        let result = execute(&cli).await.unwrap();
1631        assert!(result.contains("deleted"));
1632    }
1633
1634    #[tokio::test]
1635    async fn test_execute_logs_success() {
1636        let node = start_test_server().await;
1637        let cli = Cli {
1638            node,
1639            token: None,
1640            command: Commands::Logs {
1641                name: "test-session".into(),
1642                lines: 50,
1643                follow: false,
1644            },
1645        };
1646        let result = execute(&cli).await.unwrap();
1647        assert!(result.contains("test output"));
1648    }
1649
1650    #[tokio::test]
1651    async fn test_execute_list_connection_refused() {
1652        let cli = Cli {
1653            node: "localhost:1".into(),
1654            token: None,
1655            command: Commands::List,
1656        };
1657        let result = execute(&cli).await;
1658        let err = result.unwrap_err().to_string();
1659        assert!(
1660            err.contains("Could not connect to pulpod"),
1661            "Expected friendly error, got: {err}"
1662        );
1663        assert!(err.contains("localhost:1"));
1664    }
1665
1666    #[tokio::test]
1667    async fn test_execute_nodes_connection_refused() {
1668        let cli = Cli {
1669            node: "localhost:1".into(),
1670            token: None,
1671            command: Commands::Nodes,
1672        };
1673        let result = execute(&cli).await;
1674        let err = result.unwrap_err().to_string();
1675        assert!(err.contains("Could not connect to pulpod"));
1676    }
1677
1678    #[tokio::test]
1679    async fn test_execute_kill_error_response() {
1680        use axum::{Router, http::StatusCode, routing::post};
1681
1682        let app = Router::new().route(
1683            "/api/v1/sessions/{id}/kill",
1684            post(|| async {
1685                (
1686                    StatusCode::NOT_FOUND,
1687                    "{\"error\":\"session not found: test-session\"}",
1688                )
1689            }),
1690        );
1691        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1692        let addr = listener.local_addr().unwrap();
1693        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1694        let node = format!("127.0.0.1:{}", addr.port());
1695
1696        let cli = Cli {
1697            node,
1698            token: None,
1699            command: Commands::Kill {
1700                name: "test-session".into(),
1701            },
1702        };
1703        let err = execute(&cli).await.unwrap_err();
1704        assert_eq!(err.to_string(), "session not found: test-session");
1705    }
1706
1707    #[tokio::test]
1708    async fn test_execute_delete_error_response() {
1709        use axum::{Router, http::StatusCode, routing::delete};
1710
1711        let app = Router::new().route(
1712            "/api/v1/sessions/{id}",
1713            delete(|| async {
1714                (
1715                    StatusCode::CONFLICT,
1716                    "{\"error\":\"cannot delete session in 'running' state\"}",
1717                )
1718            }),
1719        );
1720        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1721        let addr = listener.local_addr().unwrap();
1722        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1723        let node = format!("127.0.0.1:{}", addr.port());
1724
1725        let cli = Cli {
1726            node,
1727            token: None,
1728            command: Commands::Delete {
1729                name: "test-session".into(),
1730            },
1731        };
1732        let err = execute(&cli).await.unwrap_err();
1733        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1734    }
1735
1736    #[tokio::test]
1737    async fn test_execute_logs_error_response() {
1738        use axum::{Router, http::StatusCode, routing::get};
1739
1740        let app = Router::new().route(
1741            "/api/v1/sessions/{id}/output",
1742            get(|| async {
1743                (
1744                    StatusCode::NOT_FOUND,
1745                    "{\"error\":\"session not found: ghost\"}",
1746                )
1747            }),
1748        );
1749        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1750        let addr = listener.local_addr().unwrap();
1751        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1752        let node = format!("127.0.0.1:{}", addr.port());
1753
1754        let cli = Cli {
1755            node,
1756            token: None,
1757            command: Commands::Logs {
1758                name: "ghost".into(),
1759                lines: 50,
1760                follow: false,
1761            },
1762        };
1763        let err = execute(&cli).await.unwrap_err();
1764        assert_eq!(err.to_string(), "session not found: ghost");
1765    }
1766
1767    #[tokio::test]
1768    async fn test_execute_resume_error_response() {
1769        use axum::{Router, http::StatusCode, routing::post};
1770
1771        let app = Router::new().route(
1772            "/api/v1/sessions/{id}/resume",
1773            post(|| async {
1774                (
1775                    StatusCode::BAD_REQUEST,
1776                    "{\"error\":\"session is not stale (status: running)\"}",
1777                )
1778            }),
1779        );
1780        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1781        let addr = listener.local_addr().unwrap();
1782        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1783        let node = format!("127.0.0.1:{}", addr.port());
1784
1785        let cli = Cli {
1786            node,
1787            token: None,
1788            command: Commands::Resume {
1789                name: "test-session".into(),
1790            },
1791        };
1792        let err = execute(&cli).await.unwrap_err();
1793        assert_eq!(err.to_string(), "session is not stale (status: running)");
1794    }
1795
1796    #[tokio::test]
1797    async fn test_execute_spawn_error_response() {
1798        use axum::{Router, http::StatusCode, routing::post};
1799
1800        let app = Router::new().route(
1801            "/api/v1/sessions",
1802            post(|| async {
1803                (
1804                    StatusCode::INTERNAL_SERVER_ERROR,
1805                    "{\"error\":\"failed to spawn session\"}",
1806                )
1807            }),
1808        );
1809        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1810        let addr = listener.local_addr().unwrap();
1811        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1812        let node = format!("127.0.0.1:{}", addr.port());
1813
1814        let cli = Cli {
1815            node,
1816            token: None,
1817            command: Commands::Spawn {
1818                workdir: Some("/tmp/repo".into()),
1819                name: None,
1820                provider: Some("claude".into()),
1821                auto: false,
1822                unrestricted: false,
1823                model: None,
1824                system_prompt: None,
1825                allowed_tools: None,
1826                ink: None,
1827                max_turns: None,
1828                max_budget: None,
1829                output_format: None,
1830                prompt: vec!["test".into()],
1831            },
1832        };
1833        let err = execute(&cli).await.unwrap_err();
1834        assert_eq!(err.to_string(), "failed to spawn session");
1835    }
1836
1837    #[tokio::test]
1838    async fn test_execute_interventions_error_response() {
1839        use axum::{Router, http::StatusCode, routing::get};
1840
1841        let app = Router::new().route(
1842            "/api/v1/sessions/{id}/interventions",
1843            get(|| async {
1844                (
1845                    StatusCode::NOT_FOUND,
1846                    "{\"error\":\"session not found: ghost\"}",
1847                )
1848            }),
1849        );
1850        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1851        let addr = listener.local_addr().unwrap();
1852        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1853        let node = format!("127.0.0.1:{}", addr.port());
1854
1855        let cli = Cli {
1856            node,
1857            token: None,
1858            command: Commands::Interventions {
1859                name: "ghost".into(),
1860            },
1861        };
1862        let err = execute(&cli).await.unwrap_err();
1863        assert_eq!(err.to_string(), "session not found: ghost");
1864    }
1865
1866    #[tokio::test]
1867    async fn test_execute_resume_success() {
1868        let node = start_test_server().await;
1869        let cli = Cli {
1870            node,
1871            token: None,
1872            command: Commands::Resume {
1873                name: "test-session".into(),
1874            },
1875        };
1876        let result = execute(&cli).await.unwrap();
1877        assert!(result.contains("Resumed session"));
1878        assert!(result.contains("repo"));
1879    }
1880
1881    #[tokio::test]
1882    async fn test_execute_input_success() {
1883        let node = start_test_server().await;
1884        let cli = Cli {
1885            node,
1886            token: None,
1887            command: Commands::Input {
1888                name: "test-session".into(),
1889                text: Some("yes".into()),
1890            },
1891        };
1892        let result = execute(&cli).await.unwrap();
1893        assert!(result.contains("Sent input to session test-session"));
1894    }
1895
1896    #[tokio::test]
1897    async fn test_execute_input_no_text() {
1898        let node = start_test_server().await;
1899        let cli = Cli {
1900            node,
1901            token: None,
1902            command: Commands::Input {
1903                name: "test-session".into(),
1904                text: None,
1905            },
1906        };
1907        let result = execute(&cli).await.unwrap();
1908        assert!(result.contains("Sent input to session test-session"));
1909    }
1910
1911    #[tokio::test]
1912    async fn test_execute_input_connection_refused() {
1913        let cli = Cli {
1914            node: "localhost:1".into(),
1915            token: None,
1916            command: Commands::Input {
1917                name: "test".into(),
1918                text: Some("y".into()),
1919            },
1920        };
1921        let result = execute(&cli).await;
1922        let err = result.unwrap_err().to_string();
1923        assert!(err.contains("Could not connect to pulpod"));
1924    }
1925
1926    #[tokio::test]
1927    async fn test_execute_input_error_response() {
1928        use axum::{Router, http::StatusCode, routing::post};
1929
1930        let app = Router::new().route(
1931            "/api/v1/sessions/{id}/input",
1932            post(|| async {
1933                (
1934                    StatusCode::NOT_FOUND,
1935                    "{\"error\":\"session not found: ghost\"}",
1936                )
1937            }),
1938        );
1939        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1940        let addr = listener.local_addr().unwrap();
1941        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1942        let node = format!("127.0.0.1:{}", addr.port());
1943
1944        let cli = Cli {
1945            node,
1946            token: None,
1947            command: Commands::Input {
1948                name: "ghost".into(),
1949                text: Some("y".into()),
1950            },
1951        };
1952        let err = execute(&cli).await.unwrap_err();
1953        assert_eq!(err.to_string(), "session not found: ghost");
1954    }
1955
1956    #[tokio::test]
1957    async fn test_execute_ui() {
1958        let cli = Cli {
1959            node: "localhost:7433".into(),
1960            token: None,
1961            command: Commands::Ui,
1962        };
1963        let result = execute(&cli).await.unwrap();
1964        assert!(result.contains("Opening"));
1965        assert!(result.contains("http://localhost:7433"));
1966    }
1967
1968    #[tokio::test]
1969    async fn test_execute_ui_custom_node() {
1970        let cli = Cli {
1971            node: "mac-mini:7433".into(),
1972            token: None,
1973            command: Commands::Ui,
1974        };
1975        let result = execute(&cli).await.unwrap();
1976        assert!(result.contains("http://mac-mini:7433"));
1977    }
1978
1979    #[test]
1980    fn test_format_sessions_empty() {
1981        assert_eq!(format_sessions(&[]), "No sessions.");
1982    }
1983
1984    #[test]
1985    fn test_format_sessions_with_data() {
1986        use chrono::Utc;
1987        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1988        use uuid::Uuid;
1989
1990        let sessions = vec![Session {
1991            id: Uuid::nil(),
1992            name: "my-api".into(),
1993            workdir: "/tmp/repo".into(),
1994            provider: Provider::Claude,
1995            prompt: "Fix the bug".into(),
1996            status: SessionStatus::Running,
1997            mode: SessionMode::Interactive,
1998            conversation_id: None,
1999            exit_code: None,
2000            backend_session_id: None,
2001            output_snapshot: None,
2002            guard_config: None,
2003            model: None,
2004            allowed_tools: None,
2005            system_prompt: None,
2006            metadata: None,
2007            ink: None,
2008            max_turns: None,
2009            max_budget_usd: None,
2010            output_format: None,
2011            intervention_reason: None,
2012            intervention_at: None,
2013            last_output_at: None,
2014            idle_since: None,
2015            waiting_for_input: false,
2016            created_at: Utc::now(),
2017            updated_at: Utc::now(),
2018        }];
2019        let output = format_sessions(&sessions);
2020        assert!(output.contains("NAME"));
2021        assert!(output.contains("my-api"));
2022        assert!(output.contains("running"));
2023        assert!(output.contains("claude"));
2024        assert!(output.contains("Fix the bug"));
2025    }
2026
2027    #[test]
2028    fn test_format_sessions_long_prompt_truncated() {
2029        use chrono::Utc;
2030        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2031        use uuid::Uuid;
2032
2033        let sessions = vec![Session {
2034            id: Uuid::nil(),
2035            name: "test".into(),
2036            workdir: "/tmp".into(),
2037            provider: Provider::Codex,
2038            prompt: "A very long prompt that exceeds forty characters in total length".into(),
2039            status: SessionStatus::Completed,
2040            mode: SessionMode::Autonomous,
2041            conversation_id: None,
2042            exit_code: None,
2043            backend_session_id: None,
2044            output_snapshot: None,
2045            guard_config: None,
2046            model: None,
2047            allowed_tools: None,
2048            system_prompt: None,
2049            metadata: None,
2050            ink: None,
2051            max_turns: None,
2052            max_budget_usd: None,
2053            output_format: None,
2054            intervention_reason: None,
2055            intervention_at: None,
2056            last_output_at: None,
2057            idle_since: None,
2058            waiting_for_input: false,
2059            created_at: Utc::now(),
2060            updated_at: Utc::now(),
2061        }];
2062        let output = format_sessions(&sessions);
2063        assert!(output.contains("..."));
2064    }
2065
2066    #[test]
2067    fn test_format_sessions_waiting_for_input() {
2068        use chrono::Utc;
2069        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2070        use uuid::Uuid;
2071
2072        let sessions = vec![Session {
2073            id: Uuid::nil(),
2074            name: "blocked".into(),
2075            workdir: "/tmp".into(),
2076            provider: Provider::Claude,
2077            prompt: "Fix bug".into(),
2078            status: SessionStatus::Running,
2079            mode: SessionMode::Interactive,
2080            conversation_id: None,
2081            exit_code: None,
2082            backend_session_id: None,
2083            output_snapshot: None,
2084            guard_config: None,
2085            model: None,
2086            allowed_tools: None,
2087            system_prompt: None,
2088            metadata: None,
2089            ink: None,
2090            max_turns: None,
2091            max_budget_usd: None,
2092            output_format: None,
2093            intervention_reason: None,
2094            intervention_at: None,
2095            last_output_at: None,
2096            idle_since: None,
2097            waiting_for_input: true,
2098            created_at: Utc::now(),
2099            updated_at: Utc::now(),
2100        }];
2101        let output = format_sessions(&sessions);
2102        assert!(output.contains("waiting"));
2103        assert!(!output.contains("running"));
2104    }
2105
2106    #[test]
2107    fn test_format_nodes() {
2108        use pulpo_common::node::NodeInfo;
2109        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2110
2111        let resp = PeersResponse {
2112            local: NodeInfo {
2113                name: "mac-mini".into(),
2114                hostname: "h".into(),
2115                os: "macos".into(),
2116                arch: "arm64".into(),
2117                cpus: 8,
2118                memory_mb: 16384,
2119                gpu: None,
2120            },
2121            peers: vec![PeerInfo {
2122                name: "win-pc".into(),
2123                address: "win-pc:7433".into(),
2124                status: PeerStatus::Online,
2125                node_info: None,
2126                session_count: Some(3),
2127                source: PeerSource::Configured,
2128            }],
2129        };
2130        let output = format_nodes(&resp);
2131        assert!(output.contains("mac-mini"));
2132        assert!(output.contains("(local)"));
2133        assert!(output.contains("win-pc"));
2134        assert!(output.contains('3'));
2135    }
2136
2137    #[test]
2138    fn test_format_nodes_no_session_count() {
2139        use pulpo_common::node::NodeInfo;
2140        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2141
2142        let resp = PeersResponse {
2143            local: NodeInfo {
2144                name: "local".into(),
2145                hostname: "h".into(),
2146                os: "linux".into(),
2147                arch: "x86_64".into(),
2148                cpus: 4,
2149                memory_mb: 8192,
2150                gpu: None,
2151            },
2152            peers: vec![PeerInfo {
2153                name: "peer".into(),
2154                address: "peer:7433".into(),
2155                status: PeerStatus::Offline,
2156                node_info: None,
2157                session_count: None,
2158                source: PeerSource::Configured,
2159            }],
2160        };
2161        let output = format_nodes(&resp);
2162        assert!(output.contains("offline"));
2163        // No session count → shows "-"
2164        let lines: Vec<&str> = output.lines().collect();
2165        assert!(lines[2].contains('-'));
2166    }
2167
2168    #[tokio::test]
2169    async fn test_execute_resume_connection_refused() {
2170        let cli = Cli {
2171            node: "localhost:1".into(),
2172            token: None,
2173            command: Commands::Resume {
2174                name: "test".into(),
2175            },
2176        };
2177        let result = execute(&cli).await;
2178        let err = result.unwrap_err().to_string();
2179        assert!(err.contains("Could not connect to pulpod"));
2180    }
2181
2182    #[tokio::test]
2183    async fn test_execute_spawn_connection_refused() {
2184        let cli = Cli {
2185            node: "localhost:1".into(),
2186            token: None,
2187            command: Commands::Spawn {
2188                workdir: Some("/tmp".into()),
2189                name: None,
2190                provider: Some("claude".into()),
2191                auto: false,
2192                unrestricted: false,
2193                model: None,
2194                system_prompt: None,
2195                allowed_tools: None,
2196                ink: None,
2197                max_turns: None,
2198                max_budget: None,
2199                output_format: None,
2200                prompt: vec!["test".into()],
2201            },
2202        };
2203        let result = execute(&cli).await;
2204        let err = result.unwrap_err().to_string();
2205        assert!(err.contains("Could not connect to pulpod"));
2206    }
2207
2208    #[tokio::test]
2209    async fn test_execute_kill_connection_refused() {
2210        let cli = Cli {
2211            node: "localhost:1".into(),
2212            token: None,
2213            command: Commands::Kill {
2214                name: "test".into(),
2215            },
2216        };
2217        let result = execute(&cli).await;
2218        let err = result.unwrap_err().to_string();
2219        assert!(err.contains("Could not connect to pulpod"));
2220    }
2221
2222    #[tokio::test]
2223    async fn test_execute_delete_connection_refused() {
2224        let cli = Cli {
2225            node: "localhost:1".into(),
2226            token: None,
2227            command: Commands::Delete {
2228                name: "test".into(),
2229            },
2230        };
2231        let result = execute(&cli).await;
2232        let err = result.unwrap_err().to_string();
2233        assert!(err.contains("Could not connect to pulpod"));
2234    }
2235
2236    #[tokio::test]
2237    async fn test_execute_logs_connection_refused() {
2238        let cli = Cli {
2239            node: "localhost:1".into(),
2240            token: None,
2241            command: Commands::Logs {
2242                name: "test".into(),
2243                lines: 50,
2244                follow: false,
2245            },
2246        };
2247        let result = execute(&cli).await;
2248        let err = result.unwrap_err().to_string();
2249        assert!(err.contains("Could not connect to pulpod"));
2250    }
2251
2252    #[tokio::test]
2253    async fn test_friendly_error_connect() {
2254        // Make a request to a closed port to get a connect error
2255        let err = reqwest::Client::new()
2256            .get("http://127.0.0.1:1")
2257            .send()
2258            .await
2259            .unwrap_err();
2260        let friendly = friendly_error(&err, "test-node:1");
2261        let msg = friendly.to_string();
2262        assert!(
2263            msg.contains("Could not connect"),
2264            "Expected connect message, got: {msg}"
2265        );
2266    }
2267
2268    #[tokio::test]
2269    async fn test_friendly_error_other() {
2270        // A request to an invalid URL creates a builder error, not a connect error
2271        let err = reqwest::Client::new()
2272            .get("http://[::invalid::url")
2273            .send()
2274            .await
2275            .unwrap_err();
2276        let friendly = friendly_error(&err, "bad-host");
2277        let msg = friendly.to_string();
2278        assert!(
2279            msg.contains("Network error"),
2280            "Expected network error message, got: {msg}"
2281        );
2282        assert!(msg.contains("bad-host"));
2283    }
2284
2285    // -- Auth helper tests --
2286
2287    #[test]
2288    fn test_is_localhost_variants() {
2289        assert!(is_localhost("localhost:7433"));
2290        assert!(is_localhost("127.0.0.1:7433"));
2291        assert!(is_localhost("[::1]:7433"));
2292        assert!(is_localhost("::1"));
2293        assert!(is_localhost("localhost"));
2294        assert!(!is_localhost("mac-mini:7433"));
2295        assert!(!is_localhost("192.168.1.100:7433"));
2296    }
2297
2298    #[test]
2299    fn test_authed_get_with_token() {
2300        let client = reqwest::Client::new();
2301        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2302            .build()
2303            .unwrap();
2304        let auth = req
2305            .headers()
2306            .get("authorization")
2307            .unwrap()
2308            .to_str()
2309            .unwrap();
2310        assert_eq!(auth, "Bearer tok");
2311    }
2312
2313    #[test]
2314    fn test_authed_get_without_token() {
2315        let client = reqwest::Client::new();
2316        let req = authed_get(&client, "http://h:1/api".into(), None)
2317            .build()
2318            .unwrap();
2319        assert!(req.headers().get("authorization").is_none());
2320    }
2321
2322    #[test]
2323    fn test_authed_post_with_token() {
2324        let client = reqwest::Client::new();
2325        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2326            .build()
2327            .unwrap();
2328        let auth = req
2329            .headers()
2330            .get("authorization")
2331            .unwrap()
2332            .to_str()
2333            .unwrap();
2334        assert_eq!(auth, "Bearer secret");
2335    }
2336
2337    #[test]
2338    fn test_authed_post_without_token() {
2339        let client = reqwest::Client::new();
2340        let req = authed_post(&client, "http://h:1/api".into(), None)
2341            .build()
2342            .unwrap();
2343        assert!(req.headers().get("authorization").is_none());
2344    }
2345
2346    #[test]
2347    fn test_authed_delete_with_token() {
2348        let client = reqwest::Client::new();
2349        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2350            .build()
2351            .unwrap();
2352        let auth = req
2353            .headers()
2354            .get("authorization")
2355            .unwrap()
2356            .to_str()
2357            .unwrap();
2358        assert_eq!(auth, "Bearer del-tok");
2359    }
2360
2361    #[test]
2362    fn test_authed_delete_without_token() {
2363        let client = reqwest::Client::new();
2364        let req = authed_delete(&client, "http://h:1/api".into(), None)
2365            .build()
2366            .unwrap();
2367        assert!(req.headers().get("authorization").is_none());
2368    }
2369
2370    #[tokio::test]
2371    async fn test_resolve_token_explicit() {
2372        let client = reqwest::Client::new();
2373        let token =
2374            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2375        assert_eq!(token, Some("my-tok".into()));
2376    }
2377
2378    #[tokio::test]
2379    async fn test_resolve_token_remote_no_explicit() {
2380        let client = reqwest::Client::new();
2381        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2382        assert_eq!(token, None);
2383    }
2384
2385    #[tokio::test]
2386    async fn test_resolve_token_localhost_auto_discover() {
2387        use axum::{Json, Router, routing::get};
2388
2389        let app = Router::new().route(
2390            "/api/v1/auth/token",
2391            get(|| async {
2392                Json(AuthTokenResponse {
2393                    token: "discovered".into(),
2394                })
2395            }),
2396        );
2397        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2398        let addr = listener.local_addr().unwrap();
2399        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2400
2401        let node = format!("localhost:{}", addr.port());
2402        let base = base_url(&node);
2403        let client = reqwest::Client::new();
2404        let token = resolve_token(&client, &base, &node, None).await;
2405        assert_eq!(token, Some("discovered".into()));
2406    }
2407
2408    #[tokio::test]
2409    async fn test_discover_token_empty_returns_none() {
2410        use axum::{Json, Router, routing::get};
2411
2412        let app = Router::new().route(
2413            "/api/v1/auth/token",
2414            get(|| async {
2415                Json(AuthTokenResponse {
2416                    token: String::new(),
2417                })
2418            }),
2419        );
2420        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2421        let addr = listener.local_addr().unwrap();
2422        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2423
2424        let base = format!("http://127.0.0.1:{}", addr.port());
2425        let client = reqwest::Client::new();
2426        assert_eq!(discover_token(&client, &base).await, None);
2427    }
2428
2429    #[tokio::test]
2430    async fn test_discover_token_unreachable_returns_none() {
2431        let client = reqwest::Client::new();
2432        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2433    }
2434
2435    #[test]
2436    fn test_cli_parse_with_token() {
2437        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2438        assert_eq!(cli.token, Some("my-secret".into()));
2439    }
2440
2441    #[test]
2442    fn test_cli_parse_without_token() {
2443        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2444        assert_eq!(cli.token, None);
2445    }
2446
2447    #[tokio::test]
2448    async fn test_execute_with_explicit_token_sends_header() {
2449        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2450
2451        let app = Router::new().route(
2452            "/api/v1/sessions",
2453            get(|req: Request| async move {
2454                let auth = req
2455                    .headers()
2456                    .get("authorization")
2457                    .and_then(|v| v.to_str().ok())
2458                    .unwrap_or("");
2459                assert_eq!(auth, "Bearer test-token");
2460                (StatusCode::OK, "[]".to_owned())
2461            }),
2462        );
2463        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2464        let addr = listener.local_addr().unwrap();
2465        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2466        let node = format!("127.0.0.1:{}", addr.port());
2467
2468        let cli = Cli {
2469            node,
2470            token: Some("test-token".into()),
2471            command: Commands::List,
2472        };
2473        let result = execute(&cli).await.unwrap();
2474        assert_eq!(result, "No sessions.");
2475    }
2476
2477    // -- Interventions tests --
2478
2479    #[test]
2480    fn test_cli_parse_interventions() {
2481        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2482        assert!(matches!(
2483            &cli.command,
2484            Commands::Interventions { name } if name == "my-session"
2485        ));
2486    }
2487
2488    #[test]
2489    fn test_format_interventions_empty() {
2490        assert_eq!(format_interventions(&[]), "No intervention events.");
2491    }
2492
2493    #[test]
2494    fn test_format_interventions_with_data() {
2495        let events = vec![
2496            InterventionEventResponse {
2497                id: 1,
2498                session_id: "sess-1".into(),
2499                reason: "Memory exceeded threshold".into(),
2500                created_at: "2026-01-01T00:00:00Z".into(),
2501            },
2502            InterventionEventResponse {
2503                id: 2,
2504                session_id: "sess-1".into(),
2505                reason: "Idle for 10 minutes".into(),
2506                created_at: "2026-01-02T00:00:00Z".into(),
2507            },
2508        ];
2509        let output = format_interventions(&events);
2510        assert!(output.contains("ID"));
2511        assert!(output.contains("TIMESTAMP"));
2512        assert!(output.contains("REASON"));
2513        assert!(output.contains("Memory exceeded threshold"));
2514        assert!(output.contains("Idle for 10 minutes"));
2515        assert!(output.contains("2026-01-01T00:00:00Z"));
2516    }
2517
2518    #[tokio::test]
2519    async fn test_execute_interventions_empty() {
2520        let node = start_test_server().await;
2521        let cli = Cli {
2522            node,
2523            token: None,
2524            command: Commands::Interventions {
2525                name: "my-session".into(),
2526            },
2527        };
2528        let result = execute(&cli).await.unwrap();
2529        assert_eq!(result, "No intervention events.");
2530    }
2531
2532    #[tokio::test]
2533    async fn test_execute_interventions_with_data() {
2534        use axum::{Router, routing::get};
2535
2536        let app = Router::new().route(
2537            "/api/v1/sessions/{id}/interventions",
2538            get(|| async {
2539                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2540                    .to_owned()
2541            }),
2542        );
2543        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2544        let addr = listener.local_addr().unwrap();
2545        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2546        let node = format!("127.0.0.1:{}", addr.port());
2547
2548        let cli = Cli {
2549            node,
2550            token: None,
2551            command: Commands::Interventions {
2552                name: "test".into(),
2553            },
2554        };
2555        let result = execute(&cli).await.unwrap();
2556        assert!(result.contains("OOM"));
2557        assert!(result.contains("2026-01-01T00:00:00Z"));
2558    }
2559
2560    #[tokio::test]
2561    async fn test_execute_interventions_connection_refused() {
2562        let cli = Cli {
2563            node: "localhost:1".into(),
2564            token: None,
2565            command: Commands::Interventions {
2566                name: "test".into(),
2567            },
2568        };
2569        let result = execute(&cli).await;
2570        let err = result.unwrap_err().to_string();
2571        assert!(err.contains("Could not connect to pulpod"));
2572    }
2573
2574    // -- Attach command tests --
2575
2576    #[test]
2577    fn test_build_attach_command() {
2578        let cmd = build_attach_command("my-session");
2579        assert_eq!(cmd.get_program(), "tmux");
2580        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2581        assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2582    }
2583
2584    #[test]
2585    fn test_cli_parse_attach() {
2586        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2587        assert!(matches!(
2588            &cli.command,
2589            Commands::Attach { name } if name == "my-session"
2590        ));
2591    }
2592
2593    #[test]
2594    fn test_cli_parse_attach_alias() {
2595        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2596        assert!(matches!(
2597            &cli.command,
2598            Commands::Attach { name } if name == "my-session"
2599        ));
2600    }
2601
2602    #[tokio::test]
2603    async fn test_execute_attach_success() {
2604        let node = start_test_server().await;
2605        let cli = Cli {
2606            node,
2607            token: None,
2608            command: Commands::Attach {
2609                name: "test-session".into(),
2610            },
2611        };
2612        let result = execute(&cli).await.unwrap();
2613        assert!(result.contains("Detached from session test-session"));
2614    }
2615
2616    #[tokio::test]
2617    async fn test_execute_attach_with_backend_session_id() {
2618        use axum::{Router, routing::get};
2619        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2620        let app = Router::new().route(
2621            "/api/v1/sessions/{id}",
2622            get(move || async move { session_json.to_owned() }),
2623        );
2624        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2625        let addr = listener.local_addr().unwrap();
2626        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2627
2628        let cli = Cli {
2629            node: format!("127.0.0.1:{}", addr.port()),
2630            token: None,
2631            command: Commands::Attach {
2632                name: "my-session".into(),
2633            },
2634        };
2635        let result = execute(&cli).await.unwrap();
2636        assert!(result.contains("Detached from session my-session"));
2637    }
2638
2639    #[tokio::test]
2640    async fn test_execute_attach_connection_refused() {
2641        let cli = Cli {
2642            node: "localhost:1".into(),
2643            token: None,
2644            command: Commands::Attach {
2645                name: "test-session".into(),
2646            },
2647        };
2648        let result = execute(&cli).await;
2649        let err = result.unwrap_err().to_string();
2650        assert!(err.contains("Could not connect to pulpod"));
2651    }
2652
2653    #[tokio::test]
2654    async fn test_execute_attach_error_response() {
2655        use axum::{Router, http::StatusCode, routing::get};
2656        let app = Router::new().route(
2657            "/api/v1/sessions/{id}",
2658            get(|| async {
2659                (
2660                    StatusCode::NOT_FOUND,
2661                    r#"{"error":"session not found"}"#.to_owned(),
2662                )
2663            }),
2664        );
2665        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2666        let addr = listener.local_addr().unwrap();
2667        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2668
2669        let cli = Cli {
2670            node: format!("127.0.0.1:{}", addr.port()),
2671            token: None,
2672            command: Commands::Attach {
2673                name: "nonexistent".into(),
2674            },
2675        };
2676        let result = execute(&cli).await;
2677        let err = result.unwrap_err().to_string();
2678        assert!(err.contains("session not found"));
2679    }
2680
2681    // -- Alias parse tests --
2682
2683    #[test]
2684    fn test_cli_parse_alias_spawn() {
2685        let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2686        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2687    }
2688
2689    #[test]
2690    fn test_cli_parse_alias_list() {
2691        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2692        assert!(matches!(cli.command, Commands::List));
2693    }
2694
2695    #[test]
2696    fn test_cli_parse_alias_logs() {
2697        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2698        assert!(matches!(
2699            &cli.command,
2700            Commands::Logs { name, .. } if name == "my-session"
2701        ));
2702    }
2703
2704    #[test]
2705    fn test_cli_parse_alias_kill() {
2706        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2707        assert!(matches!(
2708            &cli.command,
2709            Commands::Kill { name } if name == "my-session"
2710        ));
2711    }
2712
2713    #[test]
2714    fn test_cli_parse_alias_delete() {
2715        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2716        assert!(matches!(
2717            &cli.command,
2718            Commands::Delete { name } if name == "my-session"
2719        ));
2720    }
2721
2722    #[test]
2723    fn test_cli_parse_alias_resume() {
2724        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2725        assert!(matches!(
2726            &cli.command,
2727            Commands::Resume { name } if name == "my-session"
2728        ));
2729    }
2730
2731    #[test]
2732    fn test_cli_parse_alias_nodes() {
2733        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2734        assert!(matches!(cli.command, Commands::Nodes));
2735    }
2736
2737    #[test]
2738    fn test_cli_parse_alias_interventions() {
2739        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2740        assert!(matches!(
2741            &cli.command,
2742            Commands::Interventions { name } if name == "my-session"
2743        ));
2744    }
2745
2746    #[test]
2747    fn test_api_error_json() {
2748        let err = api_error("{\"error\":\"session not found: foo\"}");
2749        assert_eq!(err.to_string(), "session not found: foo");
2750    }
2751
2752    #[test]
2753    fn test_api_error_plain_text() {
2754        let err = api_error("plain text error");
2755        assert_eq!(err.to_string(), "plain text error");
2756    }
2757
2758    // -- diff_output tests --
2759
2760    #[test]
2761    fn test_diff_output_empty_prev() {
2762        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2763    }
2764
2765    #[test]
2766    fn test_diff_output_identical() {
2767        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2768    }
2769
2770    #[test]
2771    fn test_diff_output_new_lines_appended() {
2772        let prev = "line1\nline2";
2773        let new = "line1\nline2\nline3\nline4";
2774        assert_eq!(diff_output(prev, new), "line3\nline4");
2775    }
2776
2777    #[test]
2778    fn test_diff_output_scrolled_window() {
2779        // Window of 3 lines: old lines scroll off top, new appear at bottom
2780        let prev = "line1\nline2\nline3";
2781        let new = "line2\nline3\nline4";
2782        assert_eq!(diff_output(prev, new), "line4");
2783    }
2784
2785    #[test]
2786    fn test_diff_output_completely_different() {
2787        let prev = "aaa\nbbb";
2788        let new = "xxx\nyyy";
2789        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2790    }
2791
2792    #[test]
2793    fn test_diff_output_last_line_matches_but_overlap_fails() {
2794        // Last line of prev appears in new but preceding lines don't match
2795        let prev = "aaa\ncommon";
2796        let new = "zzz\ncommon\nnew_line";
2797        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2798        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2799        // Falls through, no verified overlap, so returns everything
2800        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2801    }
2802
2803    #[test]
2804    fn test_diff_output_new_empty() {
2805        assert_eq!(diff_output("line1", ""), "");
2806    }
2807
2808    // -- follow_logs tests --
2809
2810    /// Start a test server that simulates evolving output and session status transitions.
2811    async fn start_follow_test_server() -> String {
2812        use axum::{Router, extract::Path, extract::Query, routing::get};
2813        use std::sync::Arc;
2814        use std::sync::atomic::{AtomicUsize, Ordering};
2815
2816        let call_count = Arc::new(AtomicUsize::new(0));
2817        let output_count = call_count.clone();
2818        let status_count = Arc::new(AtomicUsize::new(0));
2819        let status_count_inner = status_count.clone();
2820
2821        let app = Router::new()
2822            .route(
2823                "/api/v1/sessions/{id}/output",
2824                get(
2825                    move |_path: Path<String>,
2826                          _query: Query<std::collections::HashMap<String, String>>| {
2827                        let count = output_count.clone();
2828                        async move {
2829                            let n = count.fetch_add(1, Ordering::SeqCst);
2830                            let output = match n {
2831                                0 => "line1\nline2".to_owned(),
2832                                1 => "line1\nline2\nline3".to_owned(),
2833                                _ => "line2\nline3\nline4".to_owned(),
2834                            };
2835                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2836                        }
2837                    },
2838                ),
2839            )
2840            .route(
2841                "/api/v1/sessions/{id}",
2842                get(move |_path: Path<String>| {
2843                    let count = status_count_inner.clone();
2844                    async move {
2845                        let n = count.fetch_add(1, Ordering::SeqCst);
2846                        let status = if n < 2 { "running" } else { "completed" };
2847                        format!(
2848                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2849                        )
2850                    }
2851                }),
2852            );
2853
2854        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2855        let addr = listener.local_addr().unwrap();
2856        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2857        format!("http://127.0.0.1:{}", addr.port())
2858    }
2859
2860    #[tokio::test]
2861    async fn test_follow_logs_polls_and_exits_on_completed() {
2862        let base = start_follow_test_server().await;
2863        let client = reqwest::Client::new();
2864        let mut buf = Vec::new();
2865
2866        follow_logs(&client, &base, "test", 100, None, &mut buf)
2867            .await
2868            .unwrap();
2869
2870        let output = String::from_utf8(buf).unwrap();
2871        // Should contain initial output + new lines
2872        assert!(output.contains("line1"));
2873        assert!(output.contains("line2"));
2874        assert!(output.contains("line3"));
2875        assert!(output.contains("line4"));
2876    }
2877
2878    #[tokio::test]
2879    async fn test_execute_logs_follow_success() {
2880        let base = start_follow_test_server().await;
2881        // Extract host:port from http://127.0.0.1:PORT
2882        let node = base.strip_prefix("http://").unwrap().to_owned();
2883
2884        let cli = Cli {
2885            node,
2886            token: None,
2887            command: Commands::Logs {
2888                name: "test".into(),
2889                lines: 100,
2890                follow: true,
2891            },
2892        };
2893        // execute() with follow writes to stdout and returns empty string
2894        let result = execute(&cli).await.unwrap();
2895        assert_eq!(result, "");
2896    }
2897
2898    #[tokio::test]
2899    async fn test_execute_logs_follow_connection_refused() {
2900        let cli = Cli {
2901            node: "localhost:1".into(),
2902            token: None,
2903            command: Commands::Logs {
2904                name: "test".into(),
2905                lines: 50,
2906                follow: true,
2907            },
2908        };
2909        let result = execute(&cli).await;
2910        let err = result.unwrap_err().to_string();
2911        assert!(
2912            err.contains("Could not connect to pulpod"),
2913            "Expected friendly error, got: {err}"
2914        );
2915    }
2916
2917    #[tokio::test]
2918    async fn test_follow_logs_exits_on_dead() {
2919        use axum::{Router, extract::Path, extract::Query, routing::get};
2920
2921        let app = Router::new()
2922            .route(
2923                "/api/v1/sessions/{id}/output",
2924                get(
2925                    |_path: Path<String>,
2926                     _query: Query<std::collections::HashMap<String, String>>| async {
2927                        r#"{"output":"some output"}"#.to_owned()
2928                    },
2929                ),
2930            )
2931            .route(
2932                "/api/v1/sessions/{id}",
2933                get(|_path: Path<String>| async {
2934                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2935                }),
2936            );
2937
2938        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2939        let addr = listener.local_addr().unwrap();
2940        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2941        let base = format!("http://127.0.0.1:{}", addr.port());
2942
2943        let client = reqwest::Client::new();
2944        let mut buf = Vec::new();
2945        follow_logs(&client, &base, "test", 100, None, &mut buf)
2946            .await
2947            .unwrap();
2948
2949        let output = String::from_utf8(buf).unwrap();
2950        assert!(output.contains("some output"));
2951    }
2952
2953    #[tokio::test]
2954    async fn test_follow_logs_exits_on_stale() {
2955        use axum::{Router, extract::Path, extract::Query, routing::get};
2956
2957        let app = Router::new()
2958            .route(
2959                "/api/v1/sessions/{id}/output",
2960                get(
2961                    |_path: Path<String>,
2962                     _query: Query<std::collections::HashMap<String, String>>| async {
2963                        r#"{"output":"stale output"}"#.to_owned()
2964                    },
2965                ),
2966            )
2967            .route(
2968                "/api/v1/sessions/{id}",
2969                get(|_path: Path<String>| async {
2970                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2971                }),
2972            );
2973
2974        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2975        let addr = listener.local_addr().unwrap();
2976        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2977        let base = format!("http://127.0.0.1:{}", addr.port());
2978
2979        let client = reqwest::Client::new();
2980        let mut buf = Vec::new();
2981        follow_logs(&client, &base, "test", 100, None, &mut buf)
2982            .await
2983            .unwrap();
2984
2985        let output = String::from_utf8(buf).unwrap();
2986        assert!(output.contains("stale output"));
2987    }
2988
2989    #[tokio::test]
2990    async fn test_execute_logs_follow_non_reqwest_error() {
2991        use axum::{Router, extract::Path, extract::Query, routing::get};
2992
2993        // Session status endpoint returns invalid JSON to trigger a serde error
2994        let app = Router::new()
2995            .route(
2996                "/api/v1/sessions/{id}/output",
2997                get(
2998                    |_path: Path<String>,
2999                     _query: Query<std::collections::HashMap<String, String>>| async {
3000                        r#"{"output":"initial"}"#.to_owned()
3001                    },
3002                ),
3003            )
3004            .route(
3005                "/api/v1/sessions/{id}",
3006                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3007            );
3008
3009        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3010        let addr = listener.local_addr().unwrap();
3011        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3012        let node = format!("127.0.0.1:{}", addr.port());
3013
3014        let cli = Cli {
3015            node,
3016            token: None,
3017            command: Commands::Logs {
3018                name: "test".into(),
3019                lines: 100,
3020                follow: true,
3021            },
3022        };
3023        let err = execute(&cli).await.unwrap_err();
3024        // serde_json error, not a reqwest error — hits the Err(other) branch
3025        let msg = err.to_string();
3026        assert!(
3027            msg.contains("expected ident"),
3028            "Expected serde parse error, got: {msg}"
3029        );
3030    }
3031
3032    #[test]
3033    fn test_cli_parse_spawn_with_guardrails() {
3034        let cli = Cli::try_parse_from([
3035            "pulpo",
3036            "spawn",
3037            "--workdir",
3038            "/tmp",
3039            "--max-turns",
3040            "10",
3041            "--max-budget",
3042            "5.5",
3043            "--output-format",
3044            "json",
3045            "Do it",
3046        ])
3047        .unwrap();
3048        assert!(matches!(
3049            &cli.command,
3050            Commands::Spawn { max_turns, max_budget, output_format, .. }
3051                if *max_turns == Some(10) && *max_budget == Some(5.5)
3052                && output_format.as_deref() == Some("json")
3053        ));
3054    }
3055
3056    #[tokio::test]
3057    async fn test_fetch_session_status_connection_error() {
3058        let client = reqwest::Client::new();
3059        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3060        assert!(result.is_err());
3061    }
3062
3063    // -- Crontab wrapper tests --
3064
3065    #[test]
3066    fn test_build_crontab_line() {
3067        let line = build_crontab_line(
3068            "nightly-review",
3069            "0 3 * * *",
3070            "/home/me/repo",
3071            "claude",
3072            "Review PRs",
3073            "localhost:7433",
3074        );
3075        assert_eq!(
3076            line,
3077            "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3078        );
3079    }
3080
3081    #[test]
3082    fn test_crontab_install_success() {
3083        let crontab = "# existing cron\n0 * * * * echo hi\n";
3084        let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3085        let result = crontab_install(crontab, "my-job", line).unwrap();
3086        assert!(result.starts_with("# existing cron\n"));
3087        assert!(result.ends_with("#pulpo:my-job\n"));
3088        assert!(result.contains("echo hi"));
3089    }
3090
3091    #[test]
3092    fn test_crontab_install_duplicate_error() {
3093        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3094        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3095        let err = crontab_install(crontab, "my-job", line).unwrap_err();
3096        assert!(err.to_string().contains("already exists"));
3097    }
3098
3099    #[test]
3100    fn test_crontab_list_empty() {
3101        assert_eq!(crontab_list(""), "No pulpo schedules.");
3102    }
3103
3104    #[test]
3105    fn test_crontab_list_no_pulpo_entries() {
3106        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3107    }
3108
3109    #[test]
3110    fn test_crontab_list_with_entries() {
3111        let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3112        let output = crontab_list(crontab);
3113        assert!(output.contains("NAME"));
3114        assert!(output.contains("CRON"));
3115        assert!(output.contains("PAUSED"));
3116        assert!(output.contains("nightly"));
3117        assert!(output.contains("0 3 * * *"));
3118        assert!(output.contains("no"));
3119    }
3120
3121    #[test]
3122    fn test_crontab_list_paused_entry() {
3123        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3124        let output = crontab_list(crontab);
3125        assert!(output.contains("paused-job"));
3126        assert!(output.contains("yes"));
3127    }
3128
3129    #[test]
3130    fn test_crontab_list_short_line() {
3131        // A line with fewer than 5 space-separated parts but still tagged
3132        let crontab = "badcron #pulpo:broken\n";
3133        let output = crontab_list(crontab);
3134        assert!(output.contains("broken"));
3135        assert!(output.contains('?'));
3136    }
3137
3138    #[test]
3139    fn test_crontab_remove_success() {
3140        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3141        let result = crontab_remove(crontab, "my-job").unwrap();
3142        assert!(result.contains("echo hi"));
3143        assert!(!result.contains("my-job"));
3144    }
3145
3146    #[test]
3147    fn test_crontab_remove_not_found() {
3148        let crontab = "0 * * * * echo hi\n";
3149        let err = crontab_remove(crontab, "ghost").unwrap_err();
3150        assert!(err.to_string().contains("not found"));
3151    }
3152
3153    #[test]
3154    fn test_crontab_pause_success() {
3155        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3156        let result = crontab_pause(crontab, "my-job").unwrap();
3157        assert!(result.starts_with('#'));
3158        assert!(result.contains("#pulpo:my-job"));
3159    }
3160
3161    #[test]
3162    fn test_crontab_pause_not_found() {
3163        let crontab = "0 * * * * echo hi\n";
3164        let err = crontab_pause(crontab, "ghost").unwrap_err();
3165        assert!(err.to_string().contains("not found or already paused"));
3166    }
3167
3168    #[test]
3169    fn test_crontab_pause_already_paused() {
3170        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3171        let err = crontab_pause(crontab, "my-job").unwrap_err();
3172        assert!(err.to_string().contains("already paused"));
3173    }
3174
3175    #[test]
3176    fn test_crontab_resume_success() {
3177        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3178        let result = crontab_resume(crontab, "my-job").unwrap();
3179        assert!(!result.starts_with('#'));
3180        assert!(result.contains("#pulpo:my-job"));
3181    }
3182
3183    #[test]
3184    fn test_crontab_resume_not_found() {
3185        let crontab = "0 * * * * echo hi\n";
3186        let err = crontab_resume(crontab, "ghost").unwrap_err();
3187        assert!(err.to_string().contains("not found or not paused"));
3188    }
3189
3190    #[test]
3191    fn test_crontab_resume_not_paused() {
3192        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3193        let err = crontab_resume(crontab, "my-job").unwrap_err();
3194        assert!(err.to_string().contains("not paused"));
3195    }
3196
3197    // -- Schedule CLI parse tests --
3198
3199    #[test]
3200    fn test_cli_parse_schedule_install() {
3201        let cli = Cli::try_parse_from([
3202            "pulpo",
3203            "schedule",
3204            "install",
3205            "nightly",
3206            "0 3 * * *",
3207            "--workdir",
3208            "/tmp/repo",
3209            "Review",
3210            "PRs",
3211        ])
3212        .unwrap();
3213        assert!(matches!(
3214            &cli.command,
3215            Commands::Schedule {
3216                action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3217            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3218              && provider == "claude" && prompt == &["Review", "PRs"]
3219        ));
3220    }
3221
3222    #[test]
3223    fn test_cli_parse_schedule_list() {
3224        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3225        assert!(matches!(
3226            &cli.command,
3227            Commands::Schedule {
3228                action: ScheduleAction::List
3229            }
3230        ));
3231    }
3232
3233    #[test]
3234    fn test_cli_parse_schedule_remove() {
3235        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3236        assert!(matches!(
3237            &cli.command,
3238            Commands::Schedule {
3239                action: ScheduleAction::Remove { name }
3240            } if name == "nightly"
3241        ));
3242    }
3243
3244    #[test]
3245    fn test_cli_parse_schedule_pause() {
3246        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3247        assert!(matches!(
3248            &cli.command,
3249            Commands::Schedule {
3250                action: ScheduleAction::Pause { name }
3251            } if name == "nightly"
3252        ));
3253    }
3254
3255    #[test]
3256    fn test_cli_parse_schedule_resume() {
3257        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3258        assert!(matches!(
3259            &cli.command,
3260            Commands::Schedule {
3261                action: ScheduleAction::Resume { name }
3262            } if name == "nightly"
3263        ));
3264    }
3265
3266    #[test]
3267    fn test_cli_parse_schedule_alias() {
3268        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3269        assert!(matches!(
3270            &cli.command,
3271            Commands::Schedule {
3272                action: ScheduleAction::List
3273            }
3274        ));
3275    }
3276
3277    #[test]
3278    fn test_cli_parse_schedule_list_alias() {
3279        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3280        assert!(matches!(
3281            &cli.command,
3282            Commands::Schedule {
3283                action: ScheduleAction::List
3284            }
3285        ));
3286    }
3287
3288    #[test]
3289    fn test_cli_parse_schedule_remove_alias() {
3290        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3291        assert!(matches!(
3292            &cli.command,
3293            Commands::Schedule {
3294                action: ScheduleAction::Remove { name }
3295            } if name == "nightly"
3296        ));
3297    }
3298
3299    #[test]
3300    fn test_cli_parse_schedule_install_custom_provider() {
3301        let cli = Cli::try_parse_from([
3302            "pulpo",
3303            "schedule",
3304            "install",
3305            "daily",
3306            "0 9 * * *",
3307            "--workdir",
3308            "/tmp",
3309            "--provider",
3310            "codex",
3311            "Run tests",
3312        ])
3313        .unwrap();
3314        assert!(matches!(
3315            &cli.command,
3316            Commands::Schedule {
3317                action: ScheduleAction::Install { provider, .. }
3318            } if provider == "codex"
3319        ));
3320    }
3321
3322    #[tokio::test]
3323    async fn test_execute_schedule_via_execute() {
3324        // Under coverage builds, execute_schedule is a stub returning Ok("")
3325        let node = start_test_server().await;
3326        let cli = Cli {
3327            node,
3328            token: None,
3329            command: Commands::Schedule {
3330                action: ScheduleAction::List,
3331            },
3332        };
3333        let result = execute(&cli).await;
3334        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3335        assert!(result.is_ok() || result.is_err());
3336    }
3337
3338    #[test]
3339    fn test_schedule_action_debug() {
3340        let action = ScheduleAction::List;
3341        assert_eq!(format!("{action:?}"), "List");
3342    }
3343
3344    // ── Knowledge CLI tests ─────────────────────────────────────────────
3345
3346    #[test]
3347    fn test_cli_parse_knowledge() {
3348        let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3349        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3350    }
3351
3352    #[test]
3353    fn test_cli_parse_knowledge_alias() {
3354        let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3355        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3356    }
3357
3358    #[test]
3359    fn test_cli_parse_knowledge_with_filters() {
3360        let cli = Cli::try_parse_from([
3361            "pulpo",
3362            "knowledge",
3363            "--kind",
3364            "failure",
3365            "--repo",
3366            "/tmp/repo",
3367            "--ink",
3368            "coder",
3369            "--limit",
3370            "5",
3371        ])
3372        .unwrap();
3373        match &cli.command {
3374            Commands::Knowledge {
3375                kind,
3376                repo,
3377                ink,
3378                limit,
3379                ..
3380            } => {
3381                assert_eq!(kind.as_deref(), Some("failure"));
3382                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3383                assert_eq!(ink.as_deref(), Some("coder"));
3384                assert_eq!(*limit, 5);
3385            }
3386            _ => panic!("expected Knowledge command"),
3387        }
3388    }
3389
3390    #[test]
3391    fn test_cli_parse_knowledge_context() {
3392        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3393            .unwrap();
3394        match &cli.command {
3395            Commands::Knowledge { context, repo, .. } => {
3396                assert!(*context);
3397                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3398            }
3399            _ => panic!("expected Knowledge command"),
3400        }
3401    }
3402
3403    #[test]
3404    fn test_format_knowledge_empty() {
3405        assert_eq!(format_knowledge(&[]), "No knowledge found.");
3406    }
3407
3408    #[test]
3409    fn test_format_knowledge_items() {
3410        use chrono::Utc;
3411        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3412        use uuid::Uuid;
3413
3414        let items = vec![
3415            Knowledge {
3416                id: Uuid::new_v4(),
3417                session_id: Uuid::new_v4(),
3418                kind: KnowledgeKind::Summary,
3419                scope_repo: Some("/tmp/repo".into()),
3420                scope_ink: Some("coder".into()),
3421                title: "Fixed the auth bug".into(),
3422                body: "Details".into(),
3423                tags: vec!["claude".into(), "completed".into()],
3424                relevance: 0.7,
3425                created_at: Utc::now(),
3426            },
3427            Knowledge {
3428                id: Uuid::new_v4(),
3429                session_id: Uuid::new_v4(),
3430                kind: KnowledgeKind::Failure,
3431                scope_repo: None,
3432                scope_ink: None,
3433                title: "OOM crash during build".into(),
3434                body: "Details".into(),
3435                tags: vec!["failure".into()],
3436                relevance: 0.9,
3437                created_at: Utc::now(),
3438            },
3439        ];
3440
3441        let output = format_knowledge(&items);
3442        assert!(output.contains("KIND"));
3443        assert!(output.contains("TITLE"));
3444        assert!(output.contains("summary"));
3445        assert!(output.contains("failure"));
3446        assert!(output.contains("Fixed the auth bug"));
3447        assert!(output.contains("repo"));
3448        assert!(output.contains("0.70"));
3449    }
3450
3451    #[test]
3452    fn test_format_knowledge_long_title_truncated() {
3453        use chrono::Utc;
3454        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3455        use uuid::Uuid;
3456
3457        let items = vec![Knowledge {
3458            id: Uuid::new_v4(),
3459            session_id: Uuid::new_v4(),
3460            kind: KnowledgeKind::Summary,
3461            scope_repo: Some("/repo".into()),
3462            scope_ink: None,
3463            title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3464            body: "Body".into(),
3465            tags: vec![],
3466            relevance: 0.5,
3467            created_at: Utc::now(),
3468        }];
3469
3470        let output = format_knowledge(&items);
3471        assert!(output.contains('…'));
3472    }
3473
3474    #[test]
3475    fn test_cli_parse_knowledge_get() {
3476        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3477        match &cli.command {
3478            Commands::Knowledge { get, .. } => {
3479                assert_eq!(get.as_deref(), Some("abc-123"));
3480            }
3481            _ => panic!("expected Knowledge command"),
3482        }
3483    }
3484
3485    #[test]
3486    fn test_cli_parse_knowledge_delete() {
3487        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3488        match &cli.command {
3489            Commands::Knowledge { delete, .. } => {
3490                assert_eq!(delete.as_deref(), Some("abc-123"));
3491            }
3492            _ => panic!("expected Knowledge command"),
3493        }
3494    }
3495
3496    #[test]
3497    fn test_cli_parse_knowledge_push() {
3498        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3499        match &cli.command {
3500            Commands::Knowledge { push, .. } => {
3501                assert!(*push);
3502            }
3503            _ => panic!("expected Knowledge command"),
3504        }
3505    }
3506
3507    #[test]
3508    fn test_format_knowledge_no_repo() {
3509        use chrono::Utc;
3510        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3511        use uuid::Uuid;
3512
3513        let items = vec![Knowledge {
3514            id: Uuid::new_v4(),
3515            session_id: Uuid::new_v4(),
3516            kind: KnowledgeKind::Summary,
3517            scope_repo: None,
3518            scope_ink: None,
3519            title: "Global finding".into(),
3520            body: "Body".into(),
3521            tags: vec![],
3522            relevance: 0.5,
3523            created_at: Utc::now(),
3524        }];
3525
3526        let output = format_knowledge(&items);
3527        assert!(output.contains('-')); // "-" for no repo
3528    }
3529}