Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5    KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version
15)]
16pub struct Cli {
17    /// Target node (default: localhost)
18    #[arg(long, default_value = "localhost:7433")]
19    pub node: String,
20
21    /// Auth token (auto-discovered from local daemon if omitted)
22    #[arg(long)]
23    pub token: Option<String>,
24
25    #[command(subcommand)]
26    pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32    /// Attach to a session's terminal
33    #[command(visible_alias = "a")]
34    Attach {
35        /// Session name or ID
36        name: String,
37    },
38
39    /// Send input to a session
40    #[command(visible_alias = "i")]
41    Input {
42        /// Session name or ID
43        name: String,
44        /// Text to send (sends Enter if omitted)
45        text: Option<String>,
46    },
47
48    /// Spawn a new agent session
49    #[command(visible_alias = "s")]
50    Spawn {
51        /// Working directory (defaults to current directory)
52        #[arg(long)]
53        workdir: Option<String>,
54
55        /// Session name (auto-generated if omitted)
56        #[arg(long)]
57        name: Option<String>,
58
59        /// Agent provider (claude, codex, gemini, opencode). Uses config `default_provider` or claude.
60        #[arg(long)]
61        provider: Option<String>,
62
63        /// Run in autonomous mode (fire-and-forget)
64        #[arg(long)]
65        auto: bool,
66
67        /// Disable all safety guardrails (Claude, Gemini)
68        #[arg(long)]
69        unrestricted: bool,
70
71        /// Model override, e.g. opus, sonnet (Claude, Codex, Gemini)
72        #[arg(long)]
73        model: Option<String>,
74
75        /// System prompt to append (Claude only)
76        #[arg(long)]
77        system_prompt: Option<String>,
78
79        /// Explicit allowed tools, comma-separated (Claude only)
80        #[arg(long, value_delimiter = ',')]
81        allowed_tools: Option<Vec<String>>,
82
83        /// Ink name (from config)
84        #[arg(long)]
85        ink: Option<String>,
86
87        /// Maximum agent turns before stopping (Claude only)
88        #[arg(long)]
89        max_turns: Option<u32>,
90
91        /// Maximum budget in USD before stopping (Claude only)
92        #[arg(long)]
93        max_budget: Option<f64>,
94
95        /// Output format, e.g. json, stream-json (Claude, Gemini, opencode)
96        #[arg(long)]
97        output_format: Option<String>,
98
99        /// Task prompt
100        prompt: Vec<String>,
101    },
102
103    /// List all sessions
104    #[command(visible_alias = "ls")]
105    List,
106
107    /// Show session logs/output
108    #[command(visible_alias = "l")]
109    Logs {
110        /// Session name or ID
111        name: String,
112
113        /// Number of lines to fetch
114        #[arg(long, default_value = "100")]
115        lines: usize,
116
117        /// Follow output (like `tail -f`)
118        #[arg(short, long)]
119        follow: bool,
120    },
121
122    /// Kill a session
123    #[command(visible_alias = "k")]
124    Kill {
125        /// Session name or ID
126        name: String,
127    },
128
129    /// Permanently remove a session from history
130    #[command(visible_alias = "rm")]
131    Delete {
132        /// Session name or ID
133        name: String,
134    },
135
136    /// Resume a stale session
137    #[command(visible_alias = "r")]
138    Resume {
139        /// Session name or ID
140        name: String,
141    },
142
143    /// List all known nodes
144    #[command(visible_alias = "n")]
145    Nodes,
146
147    /// Show intervention history for a session
148    #[command(visible_alias = "iv")]
149    Interventions {
150        /// Session name or ID
151        name: String,
152    },
153
154    /// Open the web dashboard in your browser
155    Ui,
156
157    /// Query extracted knowledge from past sessions
158    #[command(visible_alias = "kn")]
159    Knowledge {
160        /// Filter by session ID
161        #[arg(long)]
162        session: Option<String>,
163
164        /// Filter by kind (summary, failure)
165        #[arg(long)]
166        kind: Option<String>,
167
168        /// Filter by repo/workdir
169        #[arg(long)]
170        repo: Option<String>,
171
172        /// Filter by ink name
173        #[arg(long)]
174        ink: Option<String>,
175
176        /// Maximum results
177        #[arg(long, default_value = "20")]
178        limit: usize,
179
180        /// Context mode: find relevant knowledge for a workdir
181        #[arg(long)]
182        context: bool,
183
184        /// Get a single knowledge item by ID
185        #[arg(long)]
186        get: Option<String>,
187
188        /// Delete a knowledge item by ID
189        #[arg(long)]
190        delete: Option<String>,
191
192        /// Push local knowledge to configured remote
193        #[arg(long)]
194        push: bool,
195    },
196
197    /// Manage scheduled agent runs via crontab
198    #[command(visible_alias = "sched")]
199    Schedule {
200        #[command(subcommand)]
201        action: ScheduleAction,
202    },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207    /// Install a cron schedule that spawns a session
208    Install {
209        /// Schedule name
210        name: String,
211        /// Cron expression (e.g. "0 3 * * *")
212        cron: String,
213        /// Working directory
214        #[arg(long)]
215        workdir: String,
216        /// Agent provider
217        #[arg(long, default_value = "claude")]
218        provider: String,
219        /// Task prompt
220        prompt: Vec<String>,
221    },
222    /// List installed pulpo cron schedules
223    #[command(alias = "ls")]
224    List,
225    /// Remove a cron schedule
226    #[command(alias = "rm")]
227    Remove {
228        /// Schedule name
229        name: String,
230    },
231    /// Pause a cron schedule (comments out the line)
232    Pause {
233        /// Schedule name
234        name: String,
235    },
236    /// Resume a paused cron schedule (uncomments the line)
237    Resume {
238        /// Schedule name
239        name: String,
240    },
241}
242
243/// Format the base URL from the node address.
244pub fn base_url(node: &str) -> String {
245    format!("http://{node}")
246}
247
248/// Response shape for the output endpoint.
249#[derive(serde::Deserialize)]
250struct OutputResponse {
251    output: String,
252}
253
254/// Format a list of sessions as a table.
255fn format_sessions(sessions: &[Session]) -> String {
256    if sessions.is_empty() {
257        return "No sessions.".into();
258    }
259    let mut lines = vec![format!(
260        "{:<20} {:<12} {:<10} {:<14} {}",
261        "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262    )];
263    for s in sessions {
264        let prompt_display = if s.prompt.len() > 40 {
265            format!("{}...", &s.prompt[..37])
266        } else {
267            s.prompt.clone()
268        };
269        let status_display = if s.waiting_for_input {
270            "waiting".to_owned()
271        } else {
272            s.status.to_string()
273        };
274        lines.push(format!(
275            "{:<20} {:<12} {:<10} {:<14} {}",
276            s.name, status_display, s.provider, s.mode, prompt_display
277        ));
278    }
279    lines.join("\n")
280}
281
282/// Format the peers response as a table.
283fn format_nodes(resp: &PeersResponse) -> String {
284    let mut lines = vec![format!(
285        "{:<20} {:<25} {:<10} {}",
286        "NAME", "ADDRESS", "STATUS", "SESSIONS"
287    )];
288    lines.push(format!(
289        "{:<20} {:<25} {:<10} {}",
290        resp.local.name, "(local)", "online", "-"
291    ));
292    for p in &resp.peers {
293        let sessions = p
294            .session_count
295            .map_or_else(|| "-".into(), |c| c.to_string());
296        lines.push(format!(
297            "{:<20} {:<25} {:<10} {}",
298            p.name, p.address, p.status, sessions
299        ));
300    }
301    lines.join("\n")
302}
303
304/// Format intervention events as a table.
305fn format_interventions(events: &[InterventionEventResponse]) -> String {
306    if events.is_empty() {
307        return "No intervention events.".into();
308    }
309    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310    for e in events {
311        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312    }
313    lines.join("\n")
314}
315
316/// Format knowledge items as a table.
317fn format_knowledge(items: &[Knowledge]) -> String {
318    if items.is_empty() {
319        return "No knowledge found.".into();
320    }
321    let mut lines = vec![format!(
322        "{:<10} {:<40} {:<10} {:<6} {}",
323        "KIND", "TITLE", "REPO", "REL", "TAGS"
324    )];
325    for k in items {
326        let title = if k.title.len() > 38 {
327            format!("{}…", &k.title[..37])
328        } else {
329            k.title.clone()
330        };
331        let repo = k
332            .scope_repo
333            .as_deref()
334            .and_then(|r| r.rsplit('/').next())
335            .unwrap_or("-");
336        let tags = k.tags.join(",");
337        lines.push(format!(
338            "{:<10} {:<40} {:<10} {:<6.2} {}",
339            k.kind, title, repo, k.relevance, tags
340        ));
341    }
342    lines.join("\n")
343}
344
345/// Build the command to open a URL in the default browser.
346#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348    #[cfg(target_os = "macos")]
349    {
350        let mut cmd = std::process::Command::new("open");
351        cmd.arg(url);
352        cmd
353    }
354    #[cfg(target_os = "linux")]
355    {
356        let mut cmd = std::process::Command::new("xdg-open");
357        cmd.arg(url);
358        cmd
359    }
360    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361    {
362        // Fallback: try xdg-open
363        let mut cmd = std::process::Command::new("xdg-open");
364        cmd.arg(url);
365        cmd
366    }
367}
368
369/// Open a URL in the default browser.
370#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372    build_open_command(url).status()?;
373    Ok(())
374}
375
376/// Stub for coverage builds — avoids opening a browser during tests.
377#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379    Ok(())
380}
381
382/// Build the command to attach to a session's terminal.
383/// Takes the backend session ID (e.g. `pulpo-my-session`) — the tmux session name.
384#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386    let mut cmd = std::process::Command::new("tmux");
387    cmd.args(["attach-session", "-t", backend_session_id]);
388    cmd
389}
390
391/// Attach to a session's terminal.
392#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394    let status = build_attach_command(backend_session_id).status()?;
395    if !status.success() {
396        anyhow::bail!("attach failed with {status}");
397    }
398    Ok(())
399}
400
401/// Stub for test and coverage builds — avoids spawning real terminals during tests.
402#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405    Ok(())
406}
407
408/// Extract a clean error message from an API JSON response (or fall back to raw text).
409fn api_error(text: &str) -> anyhow::Error {
410    serde_json::from_str::<serde_json::Value>(text)
411        .ok()
412        .and_then(|v| v["error"].as_str().map(String::from))
413        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416/// Return the response body text, or a clean error if the response was non-success.
417async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418    if resp.status().is_success() {
419        Ok(resp.text().await?)
420    } else {
421        let text = resp.text().await?;
422        Err(api_error(&text))
423    }
424}
425
426/// Map a reqwest error to a user-friendly message.
427fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428    if err.is_connect() {
429        anyhow::anyhow!(
430            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431        )
432    } else {
433        anyhow::anyhow!("Network error connecting to {node}: {err}")
434    }
435}
436
437/// Check if the node address points to localhost.
438fn is_localhost(node: &str) -> bool {
439    let host = node.split(':').next().unwrap_or(node);
440    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443/// Try to auto-discover the auth token from a local daemon.
444async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445    let resp = client
446        .get(format!("{base}/api/v1/auth/token"))
447        .send()
448        .await
449        .ok()?;
450    let body: AuthTokenResponse = resp.json().await.ok()?;
451    if body.token.is_empty() {
452        None
453    } else {
454        Some(body.token)
455    }
456}
457
458/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
459async fn resolve_token(
460    client: &reqwest::Client,
461    base: &str,
462    node: &str,
463    explicit: Option<&str>,
464) -> Option<String> {
465    if let Some(t) = explicit {
466        return Some(t.to_owned());
467    }
468    if is_localhost(node) {
469        return discover_token(client, base).await;
470    }
471    None
472}
473
474/// Build an authenticated GET request.
475fn authed_get(
476    client: &reqwest::Client,
477    url: String,
478    token: Option<&str>,
479) -> reqwest::RequestBuilder {
480    let req = client.get(url);
481    if let Some(t) = token {
482        req.bearer_auth(t)
483    } else {
484        req
485    }
486}
487
488/// Build an authenticated POST request.
489fn authed_post(
490    client: &reqwest::Client,
491    url: String,
492    token: Option<&str>,
493) -> reqwest::RequestBuilder {
494    let req = client.post(url);
495    if let Some(t) = token {
496        req.bearer_auth(t)
497    } else {
498        req
499    }
500}
501
502/// Build an authenticated DELETE request.
503fn authed_delete(
504    client: &reqwest::Client,
505    url: String,
506    token: Option<&str>,
507) -> reqwest::RequestBuilder {
508    let req = client.delete(url);
509    if let Some(t) = token {
510        req.bearer_auth(t)
511    } else {
512        req
513    }
514}
515
516/// Fetch session output from the API.
517async fn fetch_output(
518    client: &reqwest::Client,
519    base: &str,
520    name: &str,
521    lines: usize,
522    token: Option<&str>,
523) -> Result<String> {
524    let resp = authed_get(
525        client,
526        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527        token,
528    )
529    .send()
530    .await?;
531    let text = ok_or_api_error(resp).await?;
532    let output: OutputResponse = serde_json::from_str(&text)?;
533    Ok(output.output)
534}
535
536/// Fetch session status from the API.
537async fn fetch_session_status(
538    client: &reqwest::Client,
539    base: &str,
540    name: &str,
541    token: Option<&str>,
542) -> Result<String> {
543    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544        .send()
545        .await?;
546    let text = ok_or_api_error(resp).await?;
547    let session: Session = serde_json::from_str(&text)?;
548    Ok(session.status.to_string())
549}
550
551/// Compute the new trailing lines that differ from the previous output.
552///
553/// The output endpoint returns the last N lines from the terminal pane. As new lines
554/// appear, old lines at the top scroll off. We find the overlap between the end
555/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
556/// trailing lines.
557fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558    if prev.is_empty() {
559        return new;
560    }
561
562    let prev_lines: Vec<&str> = prev.lines().collect();
563    let new_lines: Vec<&str> = new.lines().collect();
564
565    if new_lines.is_empty() {
566        return "";
567    }
568
569    // prev is non-empty (early return above), so last() always succeeds
570    let last_prev = prev_lines[prev_lines.len() - 1];
571
572    // Find the last line of prev in new to determine the overlap boundary
573    for i in (0..new_lines.len()).rev() {
574        if new_lines[i] == last_prev {
575            // Verify contiguous overlap: check that lines before this match too
576            let overlap_len = prev_lines.len().min(i + 1);
577            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579            if prev_tail == new_overlap {
580                if i + 1 < new_lines.len() {
581                    // Return the slice of `new` after the overlap
582                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583                    return new.get(consumed.min(new.len())..).unwrap_or("");
584                }
585                return "";
586            }
587        }
588    }
589
590    // No overlap found — output changed completely, print it all
591    new
592}
593
594/// Follow logs by polling, printing only new output. Returns when the session ends.
595async fn follow_logs(
596    client: &reqwest::Client,
597    base: &str,
598    name: &str,
599    lines: usize,
600    token: Option<&str>,
601    writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604    write!(writer, "{prev_output}")?;
605
606    loop {
607        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609        // Check session status
610        let status = fetch_session_status(client, base, name, token).await?;
611        let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613        // Fetch latest output
614        let new_output = fetch_output(client, base, name, lines, token).await?;
615
616        let diff = diff_output(&prev_output, &new_output);
617        if !diff.is_empty() {
618            write!(writer, "{diff}")?;
619        }
620        prev_output = new_output;
621
622        if is_terminal {
623            break;
624        }
625    }
626    Ok(())
627}
628
629// --- Crontab wrapper ---
630
631#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634/// Read the current crontab. Returns empty string if no crontab exists.
635#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637    let output = std::process::Command::new("crontab").arg("-l").output()?;
638    if output.status.success() {
639        Ok(String::from_utf8_lossy(&output.stdout).to_string())
640    } else {
641        Ok(String::new())
642    }
643}
644
645/// Write the given content as the user's crontab.
646#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648    use std::io::Write;
649    let mut child = std::process::Command::new("crontab")
650        .arg("-")
651        .stdin(std::process::Stdio::piped())
652        .spawn()?;
653    child
654        .stdin
655        .as_mut()
656        .unwrap()
657        .write_all(content.as_bytes())?;
658    let status = child.wait()?;
659    if !status.success() {
660        anyhow::bail!("crontab write failed");
661    }
662    Ok(())
663}
664
665/// Build the crontab line for a pulpo schedule.
666#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668    name: &str,
669    cron: &str,
670    workdir: &str,
671    provider: &str,
672    prompt: &str,
673    node: &str,
674) -> String {
675    format!(
676        "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677    )
678}
679
680/// Install a cron schedule into a crontab string. Returns the updated crontab.
681#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683    let tag = format!("{CRONTAB_TAG}{name}");
684    if crontab.contains(&tag) {
685        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686    }
687    let mut result = crontab.to_owned();
688    result.push_str(line);
689    Ok(result)
690}
691
692/// Format pulpo crontab entries for display.
693#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695    let entries: Vec<&str> = crontab
696        .lines()
697        .filter(|l| l.contains(CRONTAB_TAG))
698        .collect();
699    if entries.is_empty() {
700        return "No pulpo schedules.".into();
701    }
702    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703    for entry in entries {
704        let paused = entry.starts_with('#');
705        let raw = entry.trim_start_matches('#').trim();
706        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708        let cron_expr = if parts.len() >= 5 {
709            parts[..5].join(" ")
710        } else {
711            "?".into()
712        };
713        lines.push(format!(
714            "{:<20} {:<15} {}",
715            name,
716            cron_expr,
717            if paused { "yes" } else { "no" }
718        ));
719    }
720    lines.join("\n")
721}
722
723/// Remove a schedule from a crontab string. Returns the updated crontab.
724#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726    use std::fmt::Write;
727    let tag = format!("{CRONTAB_TAG}{name}");
728    let filtered =
729        crontab
730            .lines()
731            .filter(|l| !l.contains(&tag))
732            .fold(String::new(), |mut acc, l| {
733                writeln!(acc, "{l}").unwrap();
734                acc
735            });
736    if filtered.len() == crontab.len() {
737        anyhow::bail!("schedule \"{name}\" not found");
738    }
739    Ok(filtered)
740}
741
742/// Pause (comment out) a schedule in a crontab string.
743#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745    use std::fmt::Write;
746    let tag = format!("{CRONTAB_TAG}{name}");
747    let mut found = false;
748    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749        if l.contains(&tag) && !l.starts_with('#') {
750            found = true;
751            writeln!(acc, "#{l}").unwrap();
752        } else {
753            writeln!(acc, "{l}").unwrap();
754        }
755        acc
756    });
757    if !found {
758        anyhow::bail!("schedule \"{name}\" not found or already paused");
759    }
760    Ok(updated)
761}
762
763/// Resume (uncomment) a schedule in a crontab string.
764#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766    use std::fmt::Write;
767    let tag = format!("{CRONTAB_TAG}{name}");
768    let mut found = false;
769    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770        if l.contains(&tag) && l.starts_with('#') {
771            found = true;
772            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773        } else {
774            writeln!(acc, "{l}").unwrap();
775        }
776        acc
777    });
778    if !found {
779        anyhow::bail!("schedule \"{name}\" not found or not paused");
780    }
781    Ok(updated)
782}
783
784/// Execute a schedule subcommand using the crontab wrapper.
785#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787    match action {
788        ScheduleAction::Install {
789            name,
790            cron,
791            workdir,
792            provider,
793            prompt,
794        } => {
795            let crontab = read_crontab()?;
796            let joined_prompt = prompt.join(" ");
797            let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798            let updated = crontab_install(&crontab, name, &line)?;
799            write_crontab(&updated)?;
800            Ok(format!("Installed schedule \"{name}\""))
801        }
802        ScheduleAction::List => {
803            let crontab = read_crontab()?;
804            Ok(crontab_list(&crontab))
805        }
806        ScheduleAction::Remove { name } => {
807            let crontab = read_crontab()?;
808            let updated = crontab_remove(&crontab, name)?;
809            write_crontab(&updated)?;
810            Ok(format!("Removed schedule \"{name}\""))
811        }
812        ScheduleAction::Pause { name } => {
813            let crontab = read_crontab()?;
814            let updated = crontab_pause(&crontab, name)?;
815            write_crontab(&updated)?;
816            Ok(format!("Paused schedule \"{name}\""))
817        }
818        ScheduleAction::Resume { name } => {
819            let crontab = read_crontab()?;
820            let updated = crontab_resume(&crontab, name)?;
821            write_crontab(&updated)?;
822            Ok(format!("Resumed schedule \"{name}\""))
823        }
824    }
825}
826
827/// Stub for coverage builds — crontab is real I/O.
828#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830    Ok(String::new())
831}
832
833/// Execute the given CLI command against the specified node.
834#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836    let url = base_url(&cli.node);
837    let client = reqwest::Client::new();
838    let node = &cli.node;
839    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841    match &cli.command {
842        Commands::Attach { name } => {
843            // Fetch session to get actual backend_session_id
844            let resp = authed_get(
845                &client,
846                format!("{url}/api/v1/sessions/{name}"),
847                token.as_deref(),
848            )
849            .send()
850            .await
851            .map_err(|e| friendly_error(&e, node))?;
852            let text = ok_or_api_error(resp).await?;
853            let session: Session = serde_json::from_str(&text)?;
854            let backend_id = session
855                .backend_session_id
856                .unwrap_or_else(|| format!("pulpo-{name}"));
857            attach_session(&backend_id)?;
858            Ok(format!("Detached from session {name}."))
859        }
860        Commands::Input { name, text } => {
861            let input_text = text.as_deref().unwrap_or("\n");
862            let body = serde_json::json!({ "text": input_text });
863            let resp = authed_post(
864                &client,
865                format!("{url}/api/v1/sessions/{name}/input"),
866                token.as_deref(),
867            )
868            .json(&body)
869            .send()
870            .await
871            .map_err(|e| friendly_error(&e, node))?;
872            ok_or_api_error(resp).await?;
873            Ok(format!("Sent input to session {name}."))
874        }
875        Commands::List => {
876            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
877                .send()
878                .await
879                .map_err(|e| friendly_error(&e, node))?;
880            let text = ok_or_api_error(resp).await?;
881            let sessions: Vec<Session> = serde_json::from_str(&text)?;
882            Ok(format_sessions(&sessions))
883        }
884        Commands::Nodes => {
885            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
886                .send()
887                .await
888                .map_err(|e| friendly_error(&e, node))?;
889            let text = ok_or_api_error(resp).await?;
890            let resp: PeersResponse = serde_json::from_str(&text)?;
891            Ok(format_nodes(&resp))
892        }
893        Commands::Spawn {
894            workdir,
895            name,
896            provider,
897            auto,
898            unrestricted,
899            model,
900            system_prompt,
901            allowed_tools,
902            ink,
903            max_turns,
904            max_budget,
905            output_format,
906            prompt,
907        } => {
908            let prompt_text = prompt.join(" ");
909            let mode = if *auto { "autonomous" } else { "interactive" };
910            // Resolve workdir: --workdir flag > current directory
911            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
912                std::env::current_dir()
913                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
914            });
915            let mut body = serde_json::json!({
916                "workdir": resolved_workdir,
917                "mode": mode,
918            });
919            // Only include prompt if non-empty
920            if !prompt_text.is_empty() {
921                body["prompt"] = serde_json::json!(prompt_text);
922            }
923            // Only include provider if explicitly specified
924            if let Some(p) = provider {
925                body["provider"] = serde_json::json!(p);
926            }
927            if *unrestricted {
928                body["unrestricted"] = serde_json::json!(true);
929            }
930            if let Some(n) = name {
931                body["name"] = serde_json::json!(n);
932            }
933            if let Some(m) = model {
934                body["model"] = serde_json::json!(m);
935            }
936            if let Some(sp) = system_prompt {
937                body["system_prompt"] = serde_json::json!(sp);
938            }
939            if let Some(tools) = allowed_tools {
940                body["allowed_tools"] = serde_json::json!(tools);
941            }
942            if let Some(p) = ink {
943                body["ink"] = serde_json::json!(p);
944            }
945            if let Some(mt) = max_turns {
946                body["max_turns"] = serde_json::json!(mt);
947            }
948            if let Some(mb) = max_budget {
949                body["max_budget_usd"] = serde_json::json!(mb);
950            }
951            if let Some(of) = output_format {
952                body["output_format"] = serde_json::json!(of);
953            }
954            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
955                .json(&body)
956                .send()
957                .await
958                .map_err(|e| friendly_error(&e, node))?;
959            let text = ok_or_api_error(resp).await?;
960            let resp: CreateSessionResponse = serde_json::from_str(&text)?;
961            let mut msg = format!(
962                "Created session \"{}\" ({})",
963                resp.session.name, resp.session.id
964            );
965            for w in &resp.warnings {
966                use std::fmt::Write;
967                let _ = write!(msg, "\n  Warning: {w}");
968            }
969            Ok(msg)
970        }
971        Commands::Kill { name } => {
972            let resp = authed_post(
973                &client,
974                format!("{url}/api/v1/sessions/{name}/kill"),
975                token.as_deref(),
976            )
977            .send()
978            .await
979            .map_err(|e| friendly_error(&e, node))?;
980            ok_or_api_error(resp).await?;
981            Ok(format!("Session {name} killed."))
982        }
983        Commands::Delete { name } => {
984            let resp = authed_delete(
985                &client,
986                format!("{url}/api/v1/sessions/{name}"),
987                token.as_deref(),
988            )
989            .send()
990            .await
991            .map_err(|e| friendly_error(&e, node))?;
992            ok_or_api_error(resp).await?;
993            Ok(format!("Session {name} deleted."))
994        }
995        Commands::Logs {
996            name,
997            lines,
998            follow,
999        } => {
1000            if *follow {
1001                let mut stdout = std::io::stdout();
1002                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1003                    .await
1004                    .map_err(|e| {
1005                        // Unwrap reqwest errors to friendly messages
1006                        match e.downcast::<reqwest::Error>() {
1007                            Ok(re) => friendly_error(&re, node),
1008                            Err(other) => other,
1009                        }
1010                    })?;
1011                Ok(String::new())
1012            } else {
1013                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1014                    .await
1015                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1016                        Ok(re) => friendly_error(&re, node),
1017                        Err(other) => other,
1018                    })?;
1019                Ok(output)
1020            }
1021        }
1022        Commands::Interventions { name } => {
1023            let resp = authed_get(
1024                &client,
1025                format!("{url}/api/v1/sessions/{name}/interventions"),
1026                token.as_deref(),
1027            )
1028            .send()
1029            .await
1030            .map_err(|e| friendly_error(&e, node))?;
1031            let text = ok_or_api_error(resp).await?;
1032            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1033            Ok(format_interventions(&events))
1034        }
1035        Commands::Ui => {
1036            let dashboard = base_url(&cli.node);
1037            open_browser(&dashboard)?;
1038            Ok(format!("Opening {dashboard}"))
1039        }
1040        Commands::Resume { name } => {
1041            let resp = authed_post(
1042                &client,
1043                format!("{url}/api/v1/sessions/{name}/resume"),
1044                token.as_deref(),
1045            )
1046            .send()
1047            .await
1048            .map_err(|e| friendly_error(&e, node))?;
1049            let text = ok_or_api_error(resp).await?;
1050            let session: Session = serde_json::from_str(&text)?;
1051            Ok(format!("Resumed session \"{}\"", session.name))
1052        }
1053        Commands::Knowledge {
1054            session,
1055            kind,
1056            repo,
1057            ink,
1058            limit,
1059            context,
1060            get,
1061            delete,
1062            push,
1063        } => {
1064            // Single-item get
1065            if let Some(id) = get {
1066                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1067                let resp = authed_get(&client, endpoint, token.as_deref())
1068                    .send()
1069                    .await
1070                    .map_err(|e| friendly_error(&e, node))?;
1071                let text = ok_or_api_error(resp).await?;
1072                let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1073                return Ok(format_knowledge(&[resp.knowledge]));
1074            }
1075
1076            // Delete by ID
1077            if let Some(id) = delete {
1078                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1079                let resp = authed_delete(&client, endpoint, token.as_deref())
1080                    .send()
1081                    .await
1082                    .map_err(|e| friendly_error(&e, node))?;
1083                let text = ok_or_api_error(resp).await?;
1084                let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1085                return Ok(if resp.deleted {
1086                    format!("Deleted knowledge item {id}")
1087                } else {
1088                    format!("Knowledge item {id} not found")
1089                });
1090            }
1091
1092            // Push to remote
1093            if *push {
1094                let endpoint = format!("{url}/api/v1/knowledge/push");
1095                let resp = authed_post(&client, endpoint, token.as_deref())
1096                    .send()
1097                    .await
1098                    .map_err(|e| friendly_error(&e, node))?;
1099                let text = ok_or_api_error(resp).await?;
1100                let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1101                return Ok(resp.message);
1102            }
1103
1104            // List / context query
1105            let mut params = vec![format!("limit={limit}")];
1106            let endpoint = if *context {
1107                if let Some(r) = repo {
1108                    params.push(format!("workdir={r}"));
1109                }
1110                if let Some(i) = ink {
1111                    params.push(format!("ink={i}"));
1112                }
1113                format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1114            } else {
1115                if let Some(s) = session {
1116                    params.push(format!("session_id={s}"));
1117                }
1118                if let Some(k) = kind {
1119                    params.push(format!("kind={k}"));
1120                }
1121                if let Some(r) = repo {
1122                    params.push(format!("repo={r}"));
1123                }
1124                if let Some(i) = ink {
1125                    params.push(format!("ink={i}"));
1126                }
1127                format!("{url}/api/v1/knowledge?{}", params.join("&"))
1128            };
1129            let resp = authed_get(&client, endpoint, token.as_deref())
1130                .send()
1131                .await
1132                .map_err(|e| friendly_error(&e, node))?;
1133            let text = ok_or_api_error(resp).await?;
1134            let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1135            Ok(format_knowledge(&resp.knowledge))
1136        }
1137        Commands::Schedule { action } => execute_schedule(action, node),
1138    }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143    use super::*;
1144
1145    #[test]
1146    fn test_base_url() {
1147        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1148        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1149    }
1150
1151    #[test]
1152    fn test_cli_parse_list() {
1153        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1154        assert_eq!(cli.node, "localhost:7433");
1155        assert!(matches!(cli.command, Commands::List));
1156    }
1157
1158    #[test]
1159    fn test_cli_parse_nodes() {
1160        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1161        assert!(matches!(cli.command, Commands::Nodes));
1162    }
1163
1164    #[test]
1165    fn test_cli_parse_ui() {
1166        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1167        assert!(matches!(cli.command, Commands::Ui));
1168    }
1169
1170    #[test]
1171    fn test_cli_parse_ui_custom_node() {
1172        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1173        assert!(matches!(cli.command, Commands::Ui));
1174        assert_eq!(cli.node, "mac-mini:7433");
1175    }
1176
1177    #[test]
1178    fn test_build_open_command() {
1179        let cmd = build_open_command("http://localhost:7433");
1180        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1181        assert_eq!(args, vec!["http://localhost:7433"]);
1182        #[cfg(target_os = "macos")]
1183        assert_eq!(cmd.get_program(), "open");
1184        #[cfg(target_os = "linux")]
1185        assert_eq!(cmd.get_program(), "xdg-open");
1186    }
1187
1188    #[test]
1189    fn test_cli_parse_spawn() {
1190        let cli = Cli::try_parse_from([
1191            "pulpo",
1192            "spawn",
1193            "--workdir",
1194            "/tmp/repo",
1195            "Fix",
1196            "the",
1197            "bug",
1198        ])
1199        .unwrap();
1200        assert!(matches!(
1201            &cli.command,
1202            Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1203                if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1204                && !unrestricted && prompt == &["Fix", "the", "bug"]
1205        ));
1206    }
1207
1208    #[test]
1209    fn test_cli_parse_spawn_with_provider() {
1210        let cli = Cli::try_parse_from([
1211            "pulpo",
1212            "spawn",
1213            "--workdir",
1214            "/tmp",
1215            "--provider",
1216            "codex",
1217            "Do it",
1218        ])
1219        .unwrap();
1220        assert!(matches!(
1221            &cli.command,
1222            Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1223        ));
1224    }
1225
1226    #[test]
1227    fn test_cli_parse_spawn_auto() {
1228        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1229            .unwrap();
1230        assert!(matches!(
1231            &cli.command,
1232            Commands::Spawn { auto, .. } if *auto
1233        ));
1234    }
1235
1236    #[test]
1237    fn test_cli_parse_spawn_unrestricted() {
1238        let cli = Cli::try_parse_from([
1239            "pulpo",
1240            "spawn",
1241            "--workdir",
1242            "/tmp",
1243            "--unrestricted",
1244            "Do it",
1245        ])
1246        .unwrap();
1247        assert!(matches!(
1248            &cli.command,
1249            Commands::Spawn { unrestricted, .. } if *unrestricted
1250        ));
1251    }
1252
1253    #[test]
1254    fn test_cli_parse_spawn_unrestricted_default() {
1255        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1256        assert!(matches!(
1257            &cli.command,
1258            Commands::Spawn { unrestricted, .. } if !unrestricted
1259        ));
1260    }
1261
1262    #[test]
1263    fn test_cli_parse_spawn_with_name() {
1264        let cli = Cli::try_parse_from([
1265            "pulpo",
1266            "spawn",
1267            "--workdir",
1268            "/tmp/repo",
1269            "--name",
1270            "my-task",
1271            "Fix it",
1272        ])
1273        .unwrap();
1274        assert!(matches!(
1275            &cli.command,
1276            Commands::Spawn { workdir, name, .. }
1277                if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1278        ));
1279    }
1280
1281    #[test]
1282    fn test_cli_parse_spawn_without_name() {
1283        let cli =
1284            Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1285        assert!(matches!(
1286            &cli.command,
1287            Commands::Spawn { name, .. } if name.is_none()
1288        ));
1289    }
1290
1291    #[test]
1292    fn test_cli_parse_logs() {
1293        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1294        assert!(matches!(
1295            &cli.command,
1296            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1297        ));
1298    }
1299
1300    #[test]
1301    fn test_cli_parse_logs_with_lines() {
1302        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1303        assert!(matches!(
1304            &cli.command,
1305            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1306        ));
1307    }
1308
1309    #[test]
1310    fn test_cli_parse_logs_follow() {
1311        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1312        assert!(matches!(
1313            &cli.command,
1314            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1315        ));
1316    }
1317
1318    #[test]
1319    fn test_cli_parse_logs_follow_short() {
1320        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1321        assert!(matches!(
1322            &cli.command,
1323            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1324        ));
1325    }
1326
1327    #[test]
1328    fn test_cli_parse_kill() {
1329        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1330        assert!(matches!(
1331            &cli.command,
1332            Commands::Kill { name } if name == "my-session"
1333        ));
1334    }
1335
1336    #[test]
1337    fn test_cli_parse_delete() {
1338        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1339        assert!(matches!(
1340            &cli.command,
1341            Commands::Delete { name } if name == "my-session"
1342        ));
1343    }
1344
1345    #[test]
1346    fn test_cli_parse_resume() {
1347        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1348        assert!(matches!(
1349            &cli.command,
1350            Commands::Resume { name } if name == "my-session"
1351        ));
1352    }
1353
1354    #[test]
1355    fn test_cli_parse_input() {
1356        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1357        assert!(matches!(
1358            &cli.command,
1359            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1360        ));
1361    }
1362
1363    #[test]
1364    fn test_cli_parse_input_no_text() {
1365        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1366        assert!(matches!(
1367            &cli.command,
1368            Commands::Input { name, text } if name == "my-session" && text.is_none()
1369        ));
1370    }
1371
1372    #[test]
1373    fn test_cli_parse_input_alias() {
1374        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1375        assert!(matches!(
1376            &cli.command,
1377            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1378        ));
1379    }
1380
1381    #[test]
1382    fn test_cli_parse_custom_node() {
1383        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1384        assert_eq!(cli.node, "win-pc:8080");
1385    }
1386
1387    #[test]
1388    fn test_cli_version() {
1389        let result = Cli::try_parse_from(["pulpo", "--version"]);
1390        // clap exits with an error (kind DisplayVersion) when --version is used
1391        let err = result.unwrap_err();
1392        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1393    }
1394
1395    #[test]
1396    fn test_cli_parse_no_subcommand_fails() {
1397        let result = Cli::try_parse_from(["pulpo"]);
1398        assert!(result.is_err());
1399    }
1400
1401    #[test]
1402    fn test_cli_debug() {
1403        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1404        let debug = format!("{cli:?}");
1405        assert!(debug.contains("List"));
1406    }
1407
1408    #[test]
1409    fn test_commands_debug() {
1410        let cmd = Commands::List;
1411        assert_eq!(format!("{cmd:?}"), "List");
1412    }
1413
1414    /// A valid Session JSON for test responses.
1415    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1416
1417    /// A valid `CreateSessionResponse` JSON wrapping the session.
1418    fn test_create_response_json() -> String {
1419        format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1420    }
1421
1422    /// Start a lightweight test HTTP server and return its address.
1423    async fn start_test_server() -> String {
1424        use axum::http::StatusCode;
1425        use axum::{
1426            Json, Router,
1427            routing::{get, post},
1428        };
1429
1430        let create_json = test_create_response_json();
1431
1432        let app = Router::new()
1433            .route(
1434                "/api/v1/sessions",
1435                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1436                    (StatusCode::CREATED, create_json.clone())
1437                }),
1438            )
1439            .route(
1440                "/api/v1/sessions/{id}",
1441                get(|| async { TEST_SESSION_JSON.to_owned() })
1442                    .delete(|| async { StatusCode::NO_CONTENT }),
1443            )
1444            .route(
1445                "/api/v1/sessions/{id}/kill",
1446                post(|| async { StatusCode::NO_CONTENT }),
1447            )
1448            .route(
1449                "/api/v1/sessions/{id}/output",
1450                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1451            )
1452            .route(
1453                "/api/v1/peers",
1454                get(|| async {
1455                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1456                }),
1457            )
1458            .route(
1459                "/api/v1/sessions/{id}/resume",
1460                axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1461            )
1462            .route(
1463                "/api/v1/sessions/{id}/interventions",
1464                get(|| async { "[]".to_owned() }),
1465            )
1466            .route(
1467                "/api/v1/sessions/{id}/input",
1468                post(|| async { StatusCode::NO_CONTENT }),
1469            );
1470
1471        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1472        let addr = listener.local_addr().unwrap();
1473        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1474        format!("127.0.0.1:{}", addr.port())
1475    }
1476
1477    #[tokio::test]
1478    async fn test_execute_list_success() {
1479        let node = start_test_server().await;
1480        let cli = Cli {
1481            node,
1482            token: None,
1483            command: Commands::List,
1484        };
1485        let result = execute(&cli).await.unwrap();
1486        assert_eq!(result, "No sessions.");
1487    }
1488
1489    #[tokio::test]
1490    async fn test_execute_nodes_success() {
1491        let node = start_test_server().await;
1492        let cli = Cli {
1493            node,
1494            token: None,
1495            command: Commands::Nodes,
1496        };
1497        let result = execute(&cli).await.unwrap();
1498        assert!(result.contains("test"));
1499        assert!(result.contains("(local)"));
1500        assert!(result.contains("NAME"));
1501    }
1502
1503    #[tokio::test]
1504    async fn test_execute_spawn_success() {
1505        let node = start_test_server().await;
1506        let cli = Cli {
1507            node,
1508            token: None,
1509            command: Commands::Spawn {
1510                workdir: Some("/tmp/repo".into()),
1511                name: None,
1512                provider: Some("claude".into()),
1513                auto: false,
1514                unrestricted: false,
1515                model: None,
1516                system_prompt: None,
1517                allowed_tools: None,
1518                ink: None,
1519                max_turns: None,
1520                max_budget: None,
1521                output_format: None,
1522                prompt: vec!["Fix".into(), "bug".into()],
1523            },
1524        };
1525        let result = execute(&cli).await.unwrap();
1526        assert!(result.contains("Created session"));
1527        assert!(result.contains("repo"));
1528    }
1529
1530    #[tokio::test]
1531    async fn test_execute_spawn_with_all_new_flags() {
1532        let node = start_test_server().await;
1533        let cli = Cli {
1534            node,
1535            token: None,
1536            command: Commands::Spawn {
1537                workdir: Some("/tmp/repo".into()),
1538                name: None,
1539                provider: Some("claude".into()),
1540                auto: false,
1541                unrestricted: false,
1542                model: Some("opus".into()),
1543                system_prompt: Some("Be helpful".into()),
1544                allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1545                ink: Some("coder".into()),
1546                max_turns: Some(5),
1547                max_budget: Some(2.5),
1548                output_format: Some("json".into()),
1549                prompt: vec!["Fix".into(), "bug".into()],
1550            },
1551        };
1552        let result = execute(&cli).await.unwrap();
1553        assert!(result.contains("Created session"));
1554    }
1555
1556    #[tokio::test]
1557    async fn test_execute_spawn_auto_mode() {
1558        let node = start_test_server().await;
1559        let cli = Cli {
1560            node,
1561            token: None,
1562            command: Commands::Spawn {
1563                workdir: Some("/tmp/repo".into()),
1564                name: None,
1565                provider: Some("claude".into()),
1566                auto: true,
1567                unrestricted: false,
1568                model: None,
1569                system_prompt: None,
1570                allowed_tools: None,
1571                ink: None,
1572                max_turns: None,
1573                max_budget: None,
1574                output_format: None,
1575                prompt: vec!["Do it".into()],
1576            },
1577        };
1578        let result = execute(&cli).await.unwrap();
1579        assert!(result.contains("Created session"));
1580    }
1581
1582    #[tokio::test]
1583    async fn test_execute_spawn_with_name() {
1584        let node = start_test_server().await;
1585        let cli = Cli {
1586            node,
1587            token: None,
1588            command: Commands::Spawn {
1589                workdir: Some("/tmp/repo".into()),
1590                name: Some("my-task".into()),
1591                provider: Some("claude".into()),
1592                auto: false,
1593                unrestricted: false,
1594                model: None,
1595                system_prompt: None,
1596                allowed_tools: None,
1597                ink: None,
1598                max_turns: None,
1599                max_budget: None,
1600                output_format: None,
1601                prompt: vec!["Fix".into(), "bug".into()],
1602            },
1603        };
1604        let result = execute(&cli).await.unwrap();
1605        assert!(result.contains("Created session"));
1606    }
1607
1608    #[tokio::test]
1609    async fn test_execute_kill_success() {
1610        let node = start_test_server().await;
1611        let cli = Cli {
1612            node,
1613            token: None,
1614            command: Commands::Kill {
1615                name: "test-session".into(),
1616            },
1617        };
1618        let result = execute(&cli).await.unwrap();
1619        assert!(result.contains("killed"));
1620    }
1621
1622    #[tokio::test]
1623    async fn test_execute_delete_success() {
1624        let node = start_test_server().await;
1625        let cli = Cli {
1626            node,
1627            token: None,
1628            command: Commands::Delete {
1629                name: "test-session".into(),
1630            },
1631        };
1632        let result = execute(&cli).await.unwrap();
1633        assert!(result.contains("deleted"));
1634    }
1635
1636    #[tokio::test]
1637    async fn test_execute_logs_success() {
1638        let node = start_test_server().await;
1639        let cli = Cli {
1640            node,
1641            token: None,
1642            command: Commands::Logs {
1643                name: "test-session".into(),
1644                lines: 50,
1645                follow: false,
1646            },
1647        };
1648        let result = execute(&cli).await.unwrap();
1649        assert!(result.contains("test output"));
1650    }
1651
1652    #[tokio::test]
1653    async fn test_execute_list_connection_refused() {
1654        let cli = Cli {
1655            node: "localhost:1".into(),
1656            token: None,
1657            command: Commands::List,
1658        };
1659        let result = execute(&cli).await;
1660        let err = result.unwrap_err().to_string();
1661        assert!(
1662            err.contains("Could not connect to pulpod"),
1663            "Expected friendly error, got: {err}"
1664        );
1665        assert!(err.contains("localhost:1"));
1666    }
1667
1668    #[tokio::test]
1669    async fn test_execute_nodes_connection_refused() {
1670        let cli = Cli {
1671            node: "localhost:1".into(),
1672            token: None,
1673            command: Commands::Nodes,
1674        };
1675        let result = execute(&cli).await;
1676        let err = result.unwrap_err().to_string();
1677        assert!(err.contains("Could not connect to pulpod"));
1678    }
1679
1680    #[tokio::test]
1681    async fn test_execute_kill_error_response() {
1682        use axum::{Router, http::StatusCode, routing::post};
1683
1684        let app = Router::new().route(
1685            "/api/v1/sessions/{id}/kill",
1686            post(|| async {
1687                (
1688                    StatusCode::NOT_FOUND,
1689                    "{\"error\":\"session not found: test-session\"}",
1690                )
1691            }),
1692        );
1693        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1694        let addr = listener.local_addr().unwrap();
1695        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1696        let node = format!("127.0.0.1:{}", addr.port());
1697
1698        let cli = Cli {
1699            node,
1700            token: None,
1701            command: Commands::Kill {
1702                name: "test-session".into(),
1703            },
1704        };
1705        let err = execute(&cli).await.unwrap_err();
1706        assert_eq!(err.to_string(), "session not found: test-session");
1707    }
1708
1709    #[tokio::test]
1710    async fn test_execute_delete_error_response() {
1711        use axum::{Router, http::StatusCode, routing::delete};
1712
1713        let app = Router::new().route(
1714            "/api/v1/sessions/{id}",
1715            delete(|| async {
1716                (
1717                    StatusCode::CONFLICT,
1718                    "{\"error\":\"cannot delete session in 'running' state\"}",
1719                )
1720            }),
1721        );
1722        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1723        let addr = listener.local_addr().unwrap();
1724        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1725        let node = format!("127.0.0.1:{}", addr.port());
1726
1727        let cli = Cli {
1728            node,
1729            token: None,
1730            command: Commands::Delete {
1731                name: "test-session".into(),
1732            },
1733        };
1734        let err = execute(&cli).await.unwrap_err();
1735        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1736    }
1737
1738    #[tokio::test]
1739    async fn test_execute_logs_error_response() {
1740        use axum::{Router, http::StatusCode, routing::get};
1741
1742        let app = Router::new().route(
1743            "/api/v1/sessions/{id}/output",
1744            get(|| async {
1745                (
1746                    StatusCode::NOT_FOUND,
1747                    "{\"error\":\"session not found: ghost\"}",
1748                )
1749            }),
1750        );
1751        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1752        let addr = listener.local_addr().unwrap();
1753        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1754        let node = format!("127.0.0.1:{}", addr.port());
1755
1756        let cli = Cli {
1757            node,
1758            token: None,
1759            command: Commands::Logs {
1760                name: "ghost".into(),
1761                lines: 50,
1762                follow: false,
1763            },
1764        };
1765        let err = execute(&cli).await.unwrap_err();
1766        assert_eq!(err.to_string(), "session not found: ghost");
1767    }
1768
1769    #[tokio::test]
1770    async fn test_execute_resume_error_response() {
1771        use axum::{Router, http::StatusCode, routing::post};
1772
1773        let app = Router::new().route(
1774            "/api/v1/sessions/{id}/resume",
1775            post(|| async {
1776                (
1777                    StatusCode::BAD_REQUEST,
1778                    "{\"error\":\"session is not stale (status: running)\"}",
1779                )
1780            }),
1781        );
1782        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1783        let addr = listener.local_addr().unwrap();
1784        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1785        let node = format!("127.0.0.1:{}", addr.port());
1786
1787        let cli = Cli {
1788            node,
1789            token: None,
1790            command: Commands::Resume {
1791                name: "test-session".into(),
1792            },
1793        };
1794        let err = execute(&cli).await.unwrap_err();
1795        assert_eq!(err.to_string(), "session is not stale (status: running)");
1796    }
1797
1798    #[tokio::test]
1799    async fn test_execute_spawn_error_response() {
1800        use axum::{Router, http::StatusCode, routing::post};
1801
1802        let app = Router::new().route(
1803            "/api/v1/sessions",
1804            post(|| async {
1805                (
1806                    StatusCode::INTERNAL_SERVER_ERROR,
1807                    "{\"error\":\"failed to spawn session\"}",
1808                )
1809            }),
1810        );
1811        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1812        let addr = listener.local_addr().unwrap();
1813        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1814        let node = format!("127.0.0.1:{}", addr.port());
1815
1816        let cli = Cli {
1817            node,
1818            token: None,
1819            command: Commands::Spawn {
1820                workdir: Some("/tmp/repo".into()),
1821                name: None,
1822                provider: Some("claude".into()),
1823                auto: false,
1824                unrestricted: false,
1825                model: None,
1826                system_prompt: None,
1827                allowed_tools: None,
1828                ink: None,
1829                max_turns: None,
1830                max_budget: None,
1831                output_format: None,
1832                prompt: vec!["test".into()],
1833            },
1834        };
1835        let err = execute(&cli).await.unwrap_err();
1836        assert_eq!(err.to_string(), "failed to spawn session");
1837    }
1838
1839    #[tokio::test]
1840    async fn test_execute_interventions_error_response() {
1841        use axum::{Router, http::StatusCode, routing::get};
1842
1843        let app = Router::new().route(
1844            "/api/v1/sessions/{id}/interventions",
1845            get(|| async {
1846                (
1847                    StatusCode::NOT_FOUND,
1848                    "{\"error\":\"session not found: ghost\"}",
1849                )
1850            }),
1851        );
1852        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1853        let addr = listener.local_addr().unwrap();
1854        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1855        let node = format!("127.0.0.1:{}", addr.port());
1856
1857        let cli = Cli {
1858            node,
1859            token: None,
1860            command: Commands::Interventions {
1861                name: "ghost".into(),
1862            },
1863        };
1864        let err = execute(&cli).await.unwrap_err();
1865        assert_eq!(err.to_string(), "session not found: ghost");
1866    }
1867
1868    #[tokio::test]
1869    async fn test_execute_resume_success() {
1870        let node = start_test_server().await;
1871        let cli = Cli {
1872            node,
1873            token: None,
1874            command: Commands::Resume {
1875                name: "test-session".into(),
1876            },
1877        };
1878        let result = execute(&cli).await.unwrap();
1879        assert!(result.contains("Resumed session"));
1880        assert!(result.contains("repo"));
1881    }
1882
1883    #[tokio::test]
1884    async fn test_execute_input_success() {
1885        let node = start_test_server().await;
1886        let cli = Cli {
1887            node,
1888            token: None,
1889            command: Commands::Input {
1890                name: "test-session".into(),
1891                text: Some("yes".into()),
1892            },
1893        };
1894        let result = execute(&cli).await.unwrap();
1895        assert!(result.contains("Sent input to session test-session"));
1896    }
1897
1898    #[tokio::test]
1899    async fn test_execute_input_no_text() {
1900        let node = start_test_server().await;
1901        let cli = Cli {
1902            node,
1903            token: None,
1904            command: Commands::Input {
1905                name: "test-session".into(),
1906                text: None,
1907            },
1908        };
1909        let result = execute(&cli).await.unwrap();
1910        assert!(result.contains("Sent input to session test-session"));
1911    }
1912
1913    #[tokio::test]
1914    async fn test_execute_input_connection_refused() {
1915        let cli = Cli {
1916            node: "localhost:1".into(),
1917            token: None,
1918            command: Commands::Input {
1919                name: "test".into(),
1920                text: Some("y".into()),
1921            },
1922        };
1923        let result = execute(&cli).await;
1924        let err = result.unwrap_err().to_string();
1925        assert!(err.contains("Could not connect to pulpod"));
1926    }
1927
1928    #[tokio::test]
1929    async fn test_execute_input_error_response() {
1930        use axum::{Router, http::StatusCode, routing::post};
1931
1932        let app = Router::new().route(
1933            "/api/v1/sessions/{id}/input",
1934            post(|| async {
1935                (
1936                    StatusCode::NOT_FOUND,
1937                    "{\"error\":\"session not found: ghost\"}",
1938                )
1939            }),
1940        );
1941        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1942        let addr = listener.local_addr().unwrap();
1943        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1944        let node = format!("127.0.0.1:{}", addr.port());
1945
1946        let cli = Cli {
1947            node,
1948            token: None,
1949            command: Commands::Input {
1950                name: "ghost".into(),
1951                text: Some("y".into()),
1952            },
1953        };
1954        let err = execute(&cli).await.unwrap_err();
1955        assert_eq!(err.to_string(), "session not found: ghost");
1956    }
1957
1958    #[tokio::test]
1959    async fn test_execute_ui() {
1960        let cli = Cli {
1961            node: "localhost:7433".into(),
1962            token: None,
1963            command: Commands::Ui,
1964        };
1965        let result = execute(&cli).await.unwrap();
1966        assert!(result.contains("Opening"));
1967        assert!(result.contains("http://localhost:7433"));
1968    }
1969
1970    #[tokio::test]
1971    async fn test_execute_ui_custom_node() {
1972        let cli = Cli {
1973            node: "mac-mini:7433".into(),
1974            token: None,
1975            command: Commands::Ui,
1976        };
1977        let result = execute(&cli).await.unwrap();
1978        assert!(result.contains("http://mac-mini:7433"));
1979    }
1980
1981    #[test]
1982    fn test_format_sessions_empty() {
1983        assert_eq!(format_sessions(&[]), "No sessions.");
1984    }
1985
1986    #[test]
1987    fn test_format_sessions_with_data() {
1988        use chrono::Utc;
1989        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1990        use uuid::Uuid;
1991
1992        let sessions = vec![Session {
1993            id: Uuid::nil(),
1994            name: "my-api".into(),
1995            workdir: "/tmp/repo".into(),
1996            provider: Provider::Claude,
1997            prompt: "Fix the bug".into(),
1998            status: SessionStatus::Running,
1999            mode: SessionMode::Interactive,
2000            conversation_id: None,
2001            exit_code: None,
2002            backend_session_id: None,
2003            output_snapshot: None,
2004            guard_config: None,
2005            model: None,
2006            allowed_tools: None,
2007            system_prompt: None,
2008            metadata: None,
2009            ink: None,
2010            max_turns: None,
2011            max_budget_usd: None,
2012            output_format: None,
2013            intervention_reason: None,
2014            intervention_at: None,
2015            last_output_at: None,
2016            idle_since: None,
2017            waiting_for_input: false,
2018            created_at: Utc::now(),
2019            updated_at: Utc::now(),
2020        }];
2021        let output = format_sessions(&sessions);
2022        assert!(output.contains("NAME"));
2023        assert!(output.contains("my-api"));
2024        assert!(output.contains("running"));
2025        assert!(output.contains("claude"));
2026        assert!(output.contains("Fix the bug"));
2027    }
2028
2029    #[test]
2030    fn test_format_sessions_long_prompt_truncated() {
2031        use chrono::Utc;
2032        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2033        use uuid::Uuid;
2034
2035        let sessions = vec![Session {
2036            id: Uuid::nil(),
2037            name: "test".into(),
2038            workdir: "/tmp".into(),
2039            provider: Provider::Codex,
2040            prompt: "A very long prompt that exceeds forty characters in total length".into(),
2041            status: SessionStatus::Completed,
2042            mode: SessionMode::Autonomous,
2043            conversation_id: None,
2044            exit_code: None,
2045            backend_session_id: None,
2046            output_snapshot: None,
2047            guard_config: None,
2048            model: None,
2049            allowed_tools: None,
2050            system_prompt: None,
2051            metadata: None,
2052            ink: None,
2053            max_turns: None,
2054            max_budget_usd: None,
2055            output_format: None,
2056            intervention_reason: None,
2057            intervention_at: None,
2058            last_output_at: None,
2059            idle_since: None,
2060            waiting_for_input: false,
2061            created_at: Utc::now(),
2062            updated_at: Utc::now(),
2063        }];
2064        let output = format_sessions(&sessions);
2065        assert!(output.contains("..."));
2066    }
2067
2068    #[test]
2069    fn test_format_sessions_waiting_for_input() {
2070        use chrono::Utc;
2071        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2072        use uuid::Uuid;
2073
2074        let sessions = vec![Session {
2075            id: Uuid::nil(),
2076            name: "blocked".into(),
2077            workdir: "/tmp".into(),
2078            provider: Provider::Claude,
2079            prompt: "Fix bug".into(),
2080            status: SessionStatus::Running,
2081            mode: SessionMode::Interactive,
2082            conversation_id: None,
2083            exit_code: None,
2084            backend_session_id: None,
2085            output_snapshot: None,
2086            guard_config: None,
2087            model: None,
2088            allowed_tools: None,
2089            system_prompt: None,
2090            metadata: None,
2091            ink: None,
2092            max_turns: None,
2093            max_budget_usd: None,
2094            output_format: None,
2095            intervention_reason: None,
2096            intervention_at: None,
2097            last_output_at: None,
2098            idle_since: None,
2099            waiting_for_input: true,
2100            created_at: Utc::now(),
2101            updated_at: Utc::now(),
2102        }];
2103        let output = format_sessions(&sessions);
2104        assert!(output.contains("waiting"));
2105        assert!(!output.contains("running"));
2106    }
2107
2108    #[test]
2109    fn test_format_nodes() {
2110        use pulpo_common::node::NodeInfo;
2111        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2112
2113        let resp = PeersResponse {
2114            local: NodeInfo {
2115                name: "mac-mini".into(),
2116                hostname: "h".into(),
2117                os: "macos".into(),
2118                arch: "arm64".into(),
2119                cpus: 8,
2120                memory_mb: 16384,
2121                gpu: None,
2122            },
2123            peers: vec![PeerInfo {
2124                name: "win-pc".into(),
2125                address: "win-pc:7433".into(),
2126                status: PeerStatus::Online,
2127                node_info: None,
2128                session_count: Some(3),
2129                source: PeerSource::Configured,
2130            }],
2131        };
2132        let output = format_nodes(&resp);
2133        assert!(output.contains("mac-mini"));
2134        assert!(output.contains("(local)"));
2135        assert!(output.contains("win-pc"));
2136        assert!(output.contains('3'));
2137    }
2138
2139    #[test]
2140    fn test_format_nodes_no_session_count() {
2141        use pulpo_common::node::NodeInfo;
2142        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2143
2144        let resp = PeersResponse {
2145            local: NodeInfo {
2146                name: "local".into(),
2147                hostname: "h".into(),
2148                os: "linux".into(),
2149                arch: "x86_64".into(),
2150                cpus: 4,
2151                memory_mb: 8192,
2152                gpu: None,
2153            },
2154            peers: vec![PeerInfo {
2155                name: "peer".into(),
2156                address: "peer:7433".into(),
2157                status: PeerStatus::Offline,
2158                node_info: None,
2159                session_count: None,
2160                source: PeerSource::Configured,
2161            }],
2162        };
2163        let output = format_nodes(&resp);
2164        assert!(output.contains("offline"));
2165        // No session count → shows "-"
2166        let lines: Vec<&str> = output.lines().collect();
2167        assert!(lines[2].contains('-'));
2168    }
2169
2170    #[tokio::test]
2171    async fn test_execute_resume_connection_refused() {
2172        let cli = Cli {
2173            node: "localhost:1".into(),
2174            token: None,
2175            command: Commands::Resume {
2176                name: "test".into(),
2177            },
2178        };
2179        let result = execute(&cli).await;
2180        let err = result.unwrap_err().to_string();
2181        assert!(err.contains("Could not connect to pulpod"));
2182    }
2183
2184    #[tokio::test]
2185    async fn test_execute_spawn_connection_refused() {
2186        let cli = Cli {
2187            node: "localhost:1".into(),
2188            token: None,
2189            command: Commands::Spawn {
2190                workdir: Some("/tmp".into()),
2191                name: None,
2192                provider: Some("claude".into()),
2193                auto: false,
2194                unrestricted: false,
2195                model: None,
2196                system_prompt: None,
2197                allowed_tools: None,
2198                ink: None,
2199                max_turns: None,
2200                max_budget: None,
2201                output_format: None,
2202                prompt: vec!["test".into()],
2203            },
2204        };
2205        let result = execute(&cli).await;
2206        let err = result.unwrap_err().to_string();
2207        assert!(err.contains("Could not connect to pulpod"));
2208    }
2209
2210    #[tokio::test]
2211    async fn test_execute_kill_connection_refused() {
2212        let cli = Cli {
2213            node: "localhost:1".into(),
2214            token: None,
2215            command: Commands::Kill {
2216                name: "test".into(),
2217            },
2218        };
2219        let result = execute(&cli).await;
2220        let err = result.unwrap_err().to_string();
2221        assert!(err.contains("Could not connect to pulpod"));
2222    }
2223
2224    #[tokio::test]
2225    async fn test_execute_delete_connection_refused() {
2226        let cli = Cli {
2227            node: "localhost:1".into(),
2228            token: None,
2229            command: Commands::Delete {
2230                name: "test".into(),
2231            },
2232        };
2233        let result = execute(&cli).await;
2234        let err = result.unwrap_err().to_string();
2235        assert!(err.contains("Could not connect to pulpod"));
2236    }
2237
2238    #[tokio::test]
2239    async fn test_execute_logs_connection_refused() {
2240        let cli = Cli {
2241            node: "localhost:1".into(),
2242            token: None,
2243            command: Commands::Logs {
2244                name: "test".into(),
2245                lines: 50,
2246                follow: false,
2247            },
2248        };
2249        let result = execute(&cli).await;
2250        let err = result.unwrap_err().to_string();
2251        assert!(err.contains("Could not connect to pulpod"));
2252    }
2253
2254    #[tokio::test]
2255    async fn test_friendly_error_connect() {
2256        // Make a request to a closed port to get a connect error
2257        let err = reqwest::Client::new()
2258            .get("http://127.0.0.1:1")
2259            .send()
2260            .await
2261            .unwrap_err();
2262        let friendly = friendly_error(&err, "test-node:1");
2263        let msg = friendly.to_string();
2264        assert!(
2265            msg.contains("Could not connect"),
2266            "Expected connect message, got: {msg}"
2267        );
2268    }
2269
2270    #[tokio::test]
2271    async fn test_friendly_error_other() {
2272        // A request to an invalid URL creates a builder error, not a connect error
2273        let err = reqwest::Client::new()
2274            .get("http://[::invalid::url")
2275            .send()
2276            .await
2277            .unwrap_err();
2278        let friendly = friendly_error(&err, "bad-host");
2279        let msg = friendly.to_string();
2280        assert!(
2281            msg.contains("Network error"),
2282            "Expected network error message, got: {msg}"
2283        );
2284        assert!(msg.contains("bad-host"));
2285    }
2286
2287    // -- Auth helper tests --
2288
2289    #[test]
2290    fn test_is_localhost_variants() {
2291        assert!(is_localhost("localhost:7433"));
2292        assert!(is_localhost("127.0.0.1:7433"));
2293        assert!(is_localhost("[::1]:7433"));
2294        assert!(is_localhost("::1"));
2295        assert!(is_localhost("localhost"));
2296        assert!(!is_localhost("mac-mini:7433"));
2297        assert!(!is_localhost("192.168.1.100:7433"));
2298    }
2299
2300    #[test]
2301    fn test_authed_get_with_token() {
2302        let client = reqwest::Client::new();
2303        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2304            .build()
2305            .unwrap();
2306        let auth = req
2307            .headers()
2308            .get("authorization")
2309            .unwrap()
2310            .to_str()
2311            .unwrap();
2312        assert_eq!(auth, "Bearer tok");
2313    }
2314
2315    #[test]
2316    fn test_authed_get_without_token() {
2317        let client = reqwest::Client::new();
2318        let req = authed_get(&client, "http://h:1/api".into(), None)
2319            .build()
2320            .unwrap();
2321        assert!(req.headers().get("authorization").is_none());
2322    }
2323
2324    #[test]
2325    fn test_authed_post_with_token() {
2326        let client = reqwest::Client::new();
2327        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2328            .build()
2329            .unwrap();
2330        let auth = req
2331            .headers()
2332            .get("authorization")
2333            .unwrap()
2334            .to_str()
2335            .unwrap();
2336        assert_eq!(auth, "Bearer secret");
2337    }
2338
2339    #[test]
2340    fn test_authed_post_without_token() {
2341        let client = reqwest::Client::new();
2342        let req = authed_post(&client, "http://h:1/api".into(), None)
2343            .build()
2344            .unwrap();
2345        assert!(req.headers().get("authorization").is_none());
2346    }
2347
2348    #[test]
2349    fn test_authed_delete_with_token() {
2350        let client = reqwest::Client::new();
2351        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2352            .build()
2353            .unwrap();
2354        let auth = req
2355            .headers()
2356            .get("authorization")
2357            .unwrap()
2358            .to_str()
2359            .unwrap();
2360        assert_eq!(auth, "Bearer del-tok");
2361    }
2362
2363    #[test]
2364    fn test_authed_delete_without_token() {
2365        let client = reqwest::Client::new();
2366        let req = authed_delete(&client, "http://h:1/api".into(), None)
2367            .build()
2368            .unwrap();
2369        assert!(req.headers().get("authorization").is_none());
2370    }
2371
2372    #[tokio::test]
2373    async fn test_resolve_token_explicit() {
2374        let client = reqwest::Client::new();
2375        let token =
2376            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2377        assert_eq!(token, Some("my-tok".into()));
2378    }
2379
2380    #[tokio::test]
2381    async fn test_resolve_token_remote_no_explicit() {
2382        let client = reqwest::Client::new();
2383        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2384        assert_eq!(token, None);
2385    }
2386
2387    #[tokio::test]
2388    async fn test_resolve_token_localhost_auto_discover() {
2389        use axum::{Json, Router, routing::get};
2390
2391        let app = Router::new().route(
2392            "/api/v1/auth/token",
2393            get(|| async {
2394                Json(AuthTokenResponse {
2395                    token: "discovered".into(),
2396                })
2397            }),
2398        );
2399        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2400        let addr = listener.local_addr().unwrap();
2401        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2402
2403        let node = format!("localhost:{}", addr.port());
2404        let base = base_url(&node);
2405        let client = reqwest::Client::new();
2406        let token = resolve_token(&client, &base, &node, None).await;
2407        assert_eq!(token, Some("discovered".into()));
2408    }
2409
2410    #[tokio::test]
2411    async fn test_discover_token_empty_returns_none() {
2412        use axum::{Json, Router, routing::get};
2413
2414        let app = Router::new().route(
2415            "/api/v1/auth/token",
2416            get(|| async {
2417                Json(AuthTokenResponse {
2418                    token: String::new(),
2419                })
2420            }),
2421        );
2422        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2423        let addr = listener.local_addr().unwrap();
2424        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2425
2426        let base = format!("http://127.0.0.1:{}", addr.port());
2427        let client = reqwest::Client::new();
2428        assert_eq!(discover_token(&client, &base).await, None);
2429    }
2430
2431    #[tokio::test]
2432    async fn test_discover_token_unreachable_returns_none() {
2433        let client = reqwest::Client::new();
2434        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2435    }
2436
2437    #[test]
2438    fn test_cli_parse_with_token() {
2439        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2440        assert_eq!(cli.token, Some("my-secret".into()));
2441    }
2442
2443    #[test]
2444    fn test_cli_parse_without_token() {
2445        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2446        assert_eq!(cli.token, None);
2447    }
2448
2449    #[tokio::test]
2450    async fn test_execute_with_explicit_token_sends_header() {
2451        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2452
2453        let app = Router::new().route(
2454            "/api/v1/sessions",
2455            get(|req: Request| async move {
2456                let auth = req
2457                    .headers()
2458                    .get("authorization")
2459                    .and_then(|v| v.to_str().ok())
2460                    .unwrap_or("");
2461                assert_eq!(auth, "Bearer test-token");
2462                (StatusCode::OK, "[]".to_owned())
2463            }),
2464        );
2465        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2466        let addr = listener.local_addr().unwrap();
2467        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2468        let node = format!("127.0.0.1:{}", addr.port());
2469
2470        let cli = Cli {
2471            node,
2472            token: Some("test-token".into()),
2473            command: Commands::List,
2474        };
2475        let result = execute(&cli).await.unwrap();
2476        assert_eq!(result, "No sessions.");
2477    }
2478
2479    // -- Interventions tests --
2480
2481    #[test]
2482    fn test_cli_parse_interventions() {
2483        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2484        assert!(matches!(
2485            &cli.command,
2486            Commands::Interventions { name } if name == "my-session"
2487        ));
2488    }
2489
2490    #[test]
2491    fn test_format_interventions_empty() {
2492        assert_eq!(format_interventions(&[]), "No intervention events.");
2493    }
2494
2495    #[test]
2496    fn test_format_interventions_with_data() {
2497        let events = vec![
2498            InterventionEventResponse {
2499                id: 1,
2500                session_id: "sess-1".into(),
2501                reason: "Memory exceeded threshold".into(),
2502                created_at: "2026-01-01T00:00:00Z".into(),
2503            },
2504            InterventionEventResponse {
2505                id: 2,
2506                session_id: "sess-1".into(),
2507                reason: "Idle for 10 minutes".into(),
2508                created_at: "2026-01-02T00:00:00Z".into(),
2509            },
2510        ];
2511        let output = format_interventions(&events);
2512        assert!(output.contains("ID"));
2513        assert!(output.contains("TIMESTAMP"));
2514        assert!(output.contains("REASON"));
2515        assert!(output.contains("Memory exceeded threshold"));
2516        assert!(output.contains("Idle for 10 minutes"));
2517        assert!(output.contains("2026-01-01T00:00:00Z"));
2518    }
2519
2520    #[tokio::test]
2521    async fn test_execute_interventions_empty() {
2522        let node = start_test_server().await;
2523        let cli = Cli {
2524            node,
2525            token: None,
2526            command: Commands::Interventions {
2527                name: "my-session".into(),
2528            },
2529        };
2530        let result = execute(&cli).await.unwrap();
2531        assert_eq!(result, "No intervention events.");
2532    }
2533
2534    #[tokio::test]
2535    async fn test_execute_interventions_with_data() {
2536        use axum::{Router, routing::get};
2537
2538        let app = Router::new().route(
2539            "/api/v1/sessions/{id}/interventions",
2540            get(|| async {
2541                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2542                    .to_owned()
2543            }),
2544        );
2545        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2546        let addr = listener.local_addr().unwrap();
2547        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2548        let node = format!("127.0.0.1:{}", addr.port());
2549
2550        let cli = Cli {
2551            node,
2552            token: None,
2553            command: Commands::Interventions {
2554                name: "test".into(),
2555            },
2556        };
2557        let result = execute(&cli).await.unwrap();
2558        assert!(result.contains("OOM"));
2559        assert!(result.contains("2026-01-01T00:00:00Z"));
2560    }
2561
2562    #[tokio::test]
2563    async fn test_execute_interventions_connection_refused() {
2564        let cli = Cli {
2565            node: "localhost:1".into(),
2566            token: None,
2567            command: Commands::Interventions {
2568                name: "test".into(),
2569            },
2570        };
2571        let result = execute(&cli).await;
2572        let err = result.unwrap_err().to_string();
2573        assert!(err.contains("Could not connect to pulpod"));
2574    }
2575
2576    // -- Attach command tests --
2577
2578    #[test]
2579    fn test_build_attach_command() {
2580        let cmd = build_attach_command("pulpo-my-session");
2581        assert_eq!(cmd.get_program(), "tmux");
2582        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2583        assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2584    }
2585
2586    #[test]
2587    fn test_cli_parse_attach() {
2588        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2589        assert!(matches!(
2590            &cli.command,
2591            Commands::Attach { name } if name == "my-session"
2592        ));
2593    }
2594
2595    #[test]
2596    fn test_cli_parse_attach_alias() {
2597        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2598        assert!(matches!(
2599            &cli.command,
2600            Commands::Attach { name } if name == "my-session"
2601        ));
2602    }
2603
2604    #[tokio::test]
2605    async fn test_execute_attach_success() {
2606        let node = start_test_server().await;
2607        let cli = Cli {
2608            node,
2609            token: None,
2610            command: Commands::Attach {
2611                name: "test-session".into(),
2612            },
2613        };
2614        let result = execute(&cli).await.unwrap();
2615        assert!(result.contains("Detached from session test-session"));
2616    }
2617
2618    #[tokio::test]
2619    async fn test_execute_attach_with_backend_session_id() {
2620        use axum::{Router, routing::get};
2621        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2622        let app = Router::new().route(
2623            "/api/v1/sessions/{id}",
2624            get(move || async move { session_json.to_owned() }),
2625        );
2626        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2627        let addr = listener.local_addr().unwrap();
2628        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2629
2630        let cli = Cli {
2631            node: format!("127.0.0.1:{}", addr.port()),
2632            token: None,
2633            command: Commands::Attach {
2634                name: "my-session".into(),
2635            },
2636        };
2637        let result = execute(&cli).await.unwrap();
2638        assert!(result.contains("Detached from session my-session"));
2639    }
2640
2641    #[tokio::test]
2642    async fn test_execute_attach_connection_refused() {
2643        let cli = Cli {
2644            node: "localhost:1".into(),
2645            token: None,
2646            command: Commands::Attach {
2647                name: "test-session".into(),
2648            },
2649        };
2650        let result = execute(&cli).await;
2651        let err = result.unwrap_err().to_string();
2652        assert!(err.contains("Could not connect to pulpod"));
2653    }
2654
2655    #[tokio::test]
2656    async fn test_execute_attach_error_response() {
2657        use axum::{Router, http::StatusCode, routing::get};
2658        let app = Router::new().route(
2659            "/api/v1/sessions/{id}",
2660            get(|| async {
2661                (
2662                    StatusCode::NOT_FOUND,
2663                    r#"{"error":"session not found"}"#.to_owned(),
2664                )
2665            }),
2666        );
2667        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2668        let addr = listener.local_addr().unwrap();
2669        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2670
2671        let cli = Cli {
2672            node: format!("127.0.0.1:{}", addr.port()),
2673            token: None,
2674            command: Commands::Attach {
2675                name: "nonexistent".into(),
2676            },
2677        };
2678        let result = execute(&cli).await;
2679        let err = result.unwrap_err().to_string();
2680        assert!(err.contains("session not found"));
2681    }
2682
2683    // -- Alias parse tests --
2684
2685    #[test]
2686    fn test_cli_parse_alias_spawn() {
2687        let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2688        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2689    }
2690
2691    #[test]
2692    fn test_cli_parse_alias_list() {
2693        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2694        assert!(matches!(cli.command, Commands::List));
2695    }
2696
2697    #[test]
2698    fn test_cli_parse_alias_logs() {
2699        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2700        assert!(matches!(
2701            &cli.command,
2702            Commands::Logs { name, .. } if name == "my-session"
2703        ));
2704    }
2705
2706    #[test]
2707    fn test_cli_parse_alias_kill() {
2708        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2709        assert!(matches!(
2710            &cli.command,
2711            Commands::Kill { name } if name == "my-session"
2712        ));
2713    }
2714
2715    #[test]
2716    fn test_cli_parse_alias_delete() {
2717        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2718        assert!(matches!(
2719            &cli.command,
2720            Commands::Delete { name } if name == "my-session"
2721        ));
2722    }
2723
2724    #[test]
2725    fn test_cli_parse_alias_resume() {
2726        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2727        assert!(matches!(
2728            &cli.command,
2729            Commands::Resume { name } if name == "my-session"
2730        ));
2731    }
2732
2733    #[test]
2734    fn test_cli_parse_alias_nodes() {
2735        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2736        assert!(matches!(cli.command, Commands::Nodes));
2737    }
2738
2739    #[test]
2740    fn test_cli_parse_alias_interventions() {
2741        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2742        assert!(matches!(
2743            &cli.command,
2744            Commands::Interventions { name } if name == "my-session"
2745        ));
2746    }
2747
2748    #[test]
2749    fn test_api_error_json() {
2750        let err = api_error("{\"error\":\"session not found: foo\"}");
2751        assert_eq!(err.to_string(), "session not found: foo");
2752    }
2753
2754    #[test]
2755    fn test_api_error_plain_text() {
2756        let err = api_error("plain text error");
2757        assert_eq!(err.to_string(), "plain text error");
2758    }
2759
2760    // -- diff_output tests --
2761
2762    #[test]
2763    fn test_diff_output_empty_prev() {
2764        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2765    }
2766
2767    #[test]
2768    fn test_diff_output_identical() {
2769        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2770    }
2771
2772    #[test]
2773    fn test_diff_output_new_lines_appended() {
2774        let prev = "line1\nline2";
2775        let new = "line1\nline2\nline3\nline4";
2776        assert_eq!(diff_output(prev, new), "line3\nline4");
2777    }
2778
2779    #[test]
2780    fn test_diff_output_scrolled_window() {
2781        // Window of 3 lines: old lines scroll off top, new appear at bottom
2782        let prev = "line1\nline2\nline3";
2783        let new = "line2\nline3\nline4";
2784        assert_eq!(diff_output(prev, new), "line4");
2785    }
2786
2787    #[test]
2788    fn test_diff_output_completely_different() {
2789        let prev = "aaa\nbbb";
2790        let new = "xxx\nyyy";
2791        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2792    }
2793
2794    #[test]
2795    fn test_diff_output_last_line_matches_but_overlap_fails() {
2796        // Last line of prev appears in new but preceding lines don't match
2797        let prev = "aaa\ncommon";
2798        let new = "zzz\ncommon\nnew_line";
2799        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2800        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2801        // Falls through, no verified overlap, so returns everything
2802        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2803    }
2804
2805    #[test]
2806    fn test_diff_output_new_empty() {
2807        assert_eq!(diff_output("line1", ""), "");
2808    }
2809
2810    // -- follow_logs tests --
2811
2812    /// Start a test server that simulates evolving output and session status transitions.
2813    async fn start_follow_test_server() -> String {
2814        use axum::{Router, extract::Path, extract::Query, routing::get};
2815        use std::sync::Arc;
2816        use std::sync::atomic::{AtomicUsize, Ordering};
2817
2818        let call_count = Arc::new(AtomicUsize::new(0));
2819        let output_count = call_count.clone();
2820        let status_count = Arc::new(AtomicUsize::new(0));
2821        let status_count_inner = status_count.clone();
2822
2823        let app = Router::new()
2824            .route(
2825                "/api/v1/sessions/{id}/output",
2826                get(
2827                    move |_path: Path<String>,
2828                          _query: Query<std::collections::HashMap<String, String>>| {
2829                        let count = output_count.clone();
2830                        async move {
2831                            let n = count.fetch_add(1, Ordering::SeqCst);
2832                            let output = match n {
2833                                0 => "line1\nline2".to_owned(),
2834                                1 => "line1\nline2\nline3".to_owned(),
2835                                _ => "line2\nline3\nline4".to_owned(),
2836                            };
2837                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2838                        }
2839                    },
2840                ),
2841            )
2842            .route(
2843                "/api/v1/sessions/{id}",
2844                get(move |_path: Path<String>| {
2845                    let count = status_count_inner.clone();
2846                    async move {
2847                        let n = count.fetch_add(1, Ordering::SeqCst);
2848                        let status = if n < 2 { "running" } else { "completed" };
2849                        format!(
2850                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2851                        )
2852                    }
2853                }),
2854            );
2855
2856        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2857        let addr = listener.local_addr().unwrap();
2858        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2859        format!("http://127.0.0.1:{}", addr.port())
2860    }
2861
2862    #[tokio::test]
2863    async fn test_follow_logs_polls_and_exits_on_completed() {
2864        let base = start_follow_test_server().await;
2865        let client = reqwest::Client::new();
2866        let mut buf = Vec::new();
2867
2868        follow_logs(&client, &base, "test", 100, None, &mut buf)
2869            .await
2870            .unwrap();
2871
2872        let output = String::from_utf8(buf).unwrap();
2873        // Should contain initial output + new lines
2874        assert!(output.contains("line1"));
2875        assert!(output.contains("line2"));
2876        assert!(output.contains("line3"));
2877        assert!(output.contains("line4"));
2878    }
2879
2880    #[tokio::test]
2881    async fn test_execute_logs_follow_success() {
2882        let base = start_follow_test_server().await;
2883        // Extract host:port from http://127.0.0.1:PORT
2884        let node = base.strip_prefix("http://").unwrap().to_owned();
2885
2886        let cli = Cli {
2887            node,
2888            token: None,
2889            command: Commands::Logs {
2890                name: "test".into(),
2891                lines: 100,
2892                follow: true,
2893            },
2894        };
2895        // execute() with follow writes to stdout and returns empty string
2896        let result = execute(&cli).await.unwrap();
2897        assert_eq!(result, "");
2898    }
2899
2900    #[tokio::test]
2901    async fn test_execute_logs_follow_connection_refused() {
2902        let cli = Cli {
2903            node: "localhost:1".into(),
2904            token: None,
2905            command: Commands::Logs {
2906                name: "test".into(),
2907                lines: 50,
2908                follow: true,
2909            },
2910        };
2911        let result = execute(&cli).await;
2912        let err = result.unwrap_err().to_string();
2913        assert!(
2914            err.contains("Could not connect to pulpod"),
2915            "Expected friendly error, got: {err}"
2916        );
2917    }
2918
2919    #[tokio::test]
2920    async fn test_follow_logs_exits_on_dead() {
2921        use axum::{Router, extract::Path, extract::Query, routing::get};
2922
2923        let app = Router::new()
2924            .route(
2925                "/api/v1/sessions/{id}/output",
2926                get(
2927                    |_path: Path<String>,
2928                     _query: Query<std::collections::HashMap<String, String>>| async {
2929                        r#"{"output":"some output"}"#.to_owned()
2930                    },
2931                ),
2932            )
2933            .route(
2934                "/api/v1/sessions/{id}",
2935                get(|_path: Path<String>| async {
2936                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2937                }),
2938            );
2939
2940        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2941        let addr = listener.local_addr().unwrap();
2942        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2943        let base = format!("http://127.0.0.1:{}", addr.port());
2944
2945        let client = reqwest::Client::new();
2946        let mut buf = Vec::new();
2947        follow_logs(&client, &base, "test", 100, None, &mut buf)
2948            .await
2949            .unwrap();
2950
2951        let output = String::from_utf8(buf).unwrap();
2952        assert!(output.contains("some output"));
2953    }
2954
2955    #[tokio::test]
2956    async fn test_follow_logs_exits_on_stale() {
2957        use axum::{Router, extract::Path, extract::Query, routing::get};
2958
2959        let app = Router::new()
2960            .route(
2961                "/api/v1/sessions/{id}/output",
2962                get(
2963                    |_path: Path<String>,
2964                     _query: Query<std::collections::HashMap<String, String>>| async {
2965                        r#"{"output":"stale output"}"#.to_owned()
2966                    },
2967                ),
2968            )
2969            .route(
2970                "/api/v1/sessions/{id}",
2971                get(|_path: Path<String>| async {
2972                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2973                }),
2974            );
2975
2976        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2977        let addr = listener.local_addr().unwrap();
2978        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2979        let base = format!("http://127.0.0.1:{}", addr.port());
2980
2981        let client = reqwest::Client::new();
2982        let mut buf = Vec::new();
2983        follow_logs(&client, &base, "test", 100, None, &mut buf)
2984            .await
2985            .unwrap();
2986
2987        let output = String::from_utf8(buf).unwrap();
2988        assert!(output.contains("stale output"));
2989    }
2990
2991    #[tokio::test]
2992    async fn test_execute_logs_follow_non_reqwest_error() {
2993        use axum::{Router, extract::Path, extract::Query, routing::get};
2994
2995        // Session status endpoint returns invalid JSON to trigger a serde error
2996        let app = Router::new()
2997            .route(
2998                "/api/v1/sessions/{id}/output",
2999                get(
3000                    |_path: Path<String>,
3001                     _query: Query<std::collections::HashMap<String, String>>| async {
3002                        r#"{"output":"initial"}"#.to_owned()
3003                    },
3004                ),
3005            )
3006            .route(
3007                "/api/v1/sessions/{id}",
3008                get(|_path: Path<String>| async { "not valid json".to_owned() }),
3009            );
3010
3011        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3012        let addr = listener.local_addr().unwrap();
3013        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3014        let node = format!("127.0.0.1:{}", addr.port());
3015
3016        let cli = Cli {
3017            node,
3018            token: None,
3019            command: Commands::Logs {
3020                name: "test".into(),
3021                lines: 100,
3022                follow: true,
3023            },
3024        };
3025        let err = execute(&cli).await.unwrap_err();
3026        // serde_json error, not a reqwest error — hits the Err(other) branch
3027        let msg = err.to_string();
3028        assert!(
3029            msg.contains("expected ident"),
3030            "Expected serde parse error, got: {msg}"
3031        );
3032    }
3033
3034    #[test]
3035    fn test_cli_parse_spawn_with_guardrails() {
3036        let cli = Cli::try_parse_from([
3037            "pulpo",
3038            "spawn",
3039            "--workdir",
3040            "/tmp",
3041            "--max-turns",
3042            "10",
3043            "--max-budget",
3044            "5.5",
3045            "--output-format",
3046            "json",
3047            "Do it",
3048        ])
3049        .unwrap();
3050        assert!(matches!(
3051            &cli.command,
3052            Commands::Spawn { max_turns, max_budget, output_format, .. }
3053                if *max_turns == Some(10) && *max_budget == Some(5.5)
3054                && output_format.as_deref() == Some("json")
3055        ));
3056    }
3057
3058    #[tokio::test]
3059    async fn test_fetch_session_status_connection_error() {
3060        let client = reqwest::Client::new();
3061        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3062        assert!(result.is_err());
3063    }
3064
3065    // -- Crontab wrapper tests --
3066
3067    #[test]
3068    fn test_build_crontab_line() {
3069        let line = build_crontab_line(
3070            "nightly-review",
3071            "0 3 * * *",
3072            "/home/me/repo",
3073            "claude",
3074            "Review PRs",
3075            "localhost:7433",
3076        );
3077        assert_eq!(
3078            line,
3079            "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3080        );
3081    }
3082
3083    #[test]
3084    fn test_crontab_install_success() {
3085        let crontab = "# existing cron\n0 * * * * echo hi\n";
3086        let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3087        let result = crontab_install(crontab, "my-job", line).unwrap();
3088        assert!(result.starts_with("# existing cron\n"));
3089        assert!(result.ends_with("#pulpo:my-job\n"));
3090        assert!(result.contains("echo hi"));
3091    }
3092
3093    #[test]
3094    fn test_crontab_install_duplicate_error() {
3095        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3096        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3097        let err = crontab_install(crontab, "my-job", line).unwrap_err();
3098        assert!(err.to_string().contains("already exists"));
3099    }
3100
3101    #[test]
3102    fn test_crontab_list_empty() {
3103        assert_eq!(crontab_list(""), "No pulpo schedules.");
3104    }
3105
3106    #[test]
3107    fn test_crontab_list_no_pulpo_entries() {
3108        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3109    }
3110
3111    #[test]
3112    fn test_crontab_list_with_entries() {
3113        let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3114        let output = crontab_list(crontab);
3115        assert!(output.contains("NAME"));
3116        assert!(output.contains("CRON"));
3117        assert!(output.contains("PAUSED"));
3118        assert!(output.contains("nightly"));
3119        assert!(output.contains("0 3 * * *"));
3120        assert!(output.contains("no"));
3121    }
3122
3123    #[test]
3124    fn test_crontab_list_paused_entry() {
3125        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3126        let output = crontab_list(crontab);
3127        assert!(output.contains("paused-job"));
3128        assert!(output.contains("yes"));
3129    }
3130
3131    #[test]
3132    fn test_crontab_list_short_line() {
3133        // A line with fewer than 5 space-separated parts but still tagged
3134        let crontab = "badcron #pulpo:broken\n";
3135        let output = crontab_list(crontab);
3136        assert!(output.contains("broken"));
3137        assert!(output.contains('?'));
3138    }
3139
3140    #[test]
3141    fn test_crontab_remove_success() {
3142        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3143        let result = crontab_remove(crontab, "my-job").unwrap();
3144        assert!(result.contains("echo hi"));
3145        assert!(!result.contains("my-job"));
3146    }
3147
3148    #[test]
3149    fn test_crontab_remove_not_found() {
3150        let crontab = "0 * * * * echo hi\n";
3151        let err = crontab_remove(crontab, "ghost").unwrap_err();
3152        assert!(err.to_string().contains("not found"));
3153    }
3154
3155    #[test]
3156    fn test_crontab_pause_success() {
3157        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3158        let result = crontab_pause(crontab, "my-job").unwrap();
3159        assert!(result.starts_with('#'));
3160        assert!(result.contains("#pulpo:my-job"));
3161    }
3162
3163    #[test]
3164    fn test_crontab_pause_not_found() {
3165        let crontab = "0 * * * * echo hi\n";
3166        let err = crontab_pause(crontab, "ghost").unwrap_err();
3167        assert!(err.to_string().contains("not found or already paused"));
3168    }
3169
3170    #[test]
3171    fn test_crontab_pause_already_paused() {
3172        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3173        let err = crontab_pause(crontab, "my-job").unwrap_err();
3174        assert!(err.to_string().contains("already paused"));
3175    }
3176
3177    #[test]
3178    fn test_crontab_resume_success() {
3179        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3180        let result = crontab_resume(crontab, "my-job").unwrap();
3181        assert!(!result.starts_with('#'));
3182        assert!(result.contains("#pulpo:my-job"));
3183    }
3184
3185    #[test]
3186    fn test_crontab_resume_not_found() {
3187        let crontab = "0 * * * * echo hi\n";
3188        let err = crontab_resume(crontab, "ghost").unwrap_err();
3189        assert!(err.to_string().contains("not found or not paused"));
3190    }
3191
3192    #[test]
3193    fn test_crontab_resume_not_paused() {
3194        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3195        let err = crontab_resume(crontab, "my-job").unwrap_err();
3196        assert!(err.to_string().contains("not paused"));
3197    }
3198
3199    // -- Schedule CLI parse tests --
3200
3201    #[test]
3202    fn test_cli_parse_schedule_install() {
3203        let cli = Cli::try_parse_from([
3204            "pulpo",
3205            "schedule",
3206            "install",
3207            "nightly",
3208            "0 3 * * *",
3209            "--workdir",
3210            "/tmp/repo",
3211            "Review",
3212            "PRs",
3213        ])
3214        .unwrap();
3215        assert!(matches!(
3216            &cli.command,
3217            Commands::Schedule {
3218                action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3219            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3220              && provider == "claude" && prompt == &["Review", "PRs"]
3221        ));
3222    }
3223
3224    #[test]
3225    fn test_cli_parse_schedule_list() {
3226        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3227        assert!(matches!(
3228            &cli.command,
3229            Commands::Schedule {
3230                action: ScheduleAction::List
3231            }
3232        ));
3233    }
3234
3235    #[test]
3236    fn test_cli_parse_schedule_remove() {
3237        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3238        assert!(matches!(
3239            &cli.command,
3240            Commands::Schedule {
3241                action: ScheduleAction::Remove { name }
3242            } if name == "nightly"
3243        ));
3244    }
3245
3246    #[test]
3247    fn test_cli_parse_schedule_pause() {
3248        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3249        assert!(matches!(
3250            &cli.command,
3251            Commands::Schedule {
3252                action: ScheduleAction::Pause { name }
3253            } if name == "nightly"
3254        ));
3255    }
3256
3257    #[test]
3258    fn test_cli_parse_schedule_resume() {
3259        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3260        assert!(matches!(
3261            &cli.command,
3262            Commands::Schedule {
3263                action: ScheduleAction::Resume { name }
3264            } if name == "nightly"
3265        ));
3266    }
3267
3268    #[test]
3269    fn test_cli_parse_schedule_alias() {
3270        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3271        assert!(matches!(
3272            &cli.command,
3273            Commands::Schedule {
3274                action: ScheduleAction::List
3275            }
3276        ));
3277    }
3278
3279    #[test]
3280    fn test_cli_parse_schedule_list_alias() {
3281        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3282        assert!(matches!(
3283            &cli.command,
3284            Commands::Schedule {
3285                action: ScheduleAction::List
3286            }
3287        ));
3288    }
3289
3290    #[test]
3291    fn test_cli_parse_schedule_remove_alias() {
3292        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3293        assert!(matches!(
3294            &cli.command,
3295            Commands::Schedule {
3296                action: ScheduleAction::Remove { name }
3297            } if name == "nightly"
3298        ));
3299    }
3300
3301    #[test]
3302    fn test_cli_parse_schedule_install_custom_provider() {
3303        let cli = Cli::try_parse_from([
3304            "pulpo",
3305            "schedule",
3306            "install",
3307            "daily",
3308            "0 9 * * *",
3309            "--workdir",
3310            "/tmp",
3311            "--provider",
3312            "codex",
3313            "Run tests",
3314        ])
3315        .unwrap();
3316        assert!(matches!(
3317            &cli.command,
3318            Commands::Schedule {
3319                action: ScheduleAction::Install { provider, .. }
3320            } if provider == "codex"
3321        ));
3322    }
3323
3324    #[tokio::test]
3325    async fn test_execute_schedule_via_execute() {
3326        // Under coverage builds, execute_schedule is a stub returning Ok("")
3327        let node = start_test_server().await;
3328        let cli = Cli {
3329            node,
3330            token: None,
3331            command: Commands::Schedule {
3332                action: ScheduleAction::List,
3333            },
3334        };
3335        let result = execute(&cli).await;
3336        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3337        assert!(result.is_ok() || result.is_err());
3338    }
3339
3340    #[test]
3341    fn test_schedule_action_debug() {
3342        let action = ScheduleAction::List;
3343        assert_eq!(format!("{action:?}"), "List");
3344    }
3345
3346    // ── Knowledge CLI tests ─────────────────────────────────────────────
3347
3348    #[test]
3349    fn test_cli_parse_knowledge() {
3350        let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3351        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3352    }
3353
3354    #[test]
3355    fn test_cli_parse_knowledge_alias() {
3356        let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3357        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3358    }
3359
3360    #[test]
3361    fn test_cli_parse_knowledge_with_filters() {
3362        let cli = Cli::try_parse_from([
3363            "pulpo",
3364            "knowledge",
3365            "--kind",
3366            "failure",
3367            "--repo",
3368            "/tmp/repo",
3369            "--ink",
3370            "coder",
3371            "--limit",
3372            "5",
3373        ])
3374        .unwrap();
3375        match &cli.command {
3376            Commands::Knowledge {
3377                kind,
3378                repo,
3379                ink,
3380                limit,
3381                ..
3382            } => {
3383                assert_eq!(kind.as_deref(), Some("failure"));
3384                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3385                assert_eq!(ink.as_deref(), Some("coder"));
3386                assert_eq!(*limit, 5);
3387            }
3388            _ => panic!("expected Knowledge command"),
3389        }
3390    }
3391
3392    #[test]
3393    fn test_cli_parse_knowledge_context() {
3394        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3395            .unwrap();
3396        match &cli.command {
3397            Commands::Knowledge { context, repo, .. } => {
3398                assert!(*context);
3399                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3400            }
3401            _ => panic!("expected Knowledge command"),
3402        }
3403    }
3404
3405    #[test]
3406    fn test_format_knowledge_empty() {
3407        assert_eq!(format_knowledge(&[]), "No knowledge found.");
3408    }
3409
3410    #[test]
3411    fn test_format_knowledge_items() {
3412        use chrono::Utc;
3413        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3414        use uuid::Uuid;
3415
3416        let items = vec![
3417            Knowledge {
3418                id: Uuid::new_v4(),
3419                session_id: Uuid::new_v4(),
3420                kind: KnowledgeKind::Summary,
3421                scope_repo: Some("/tmp/repo".into()),
3422                scope_ink: Some("coder".into()),
3423                title: "Fixed the auth bug".into(),
3424                body: "Details".into(),
3425                tags: vec!["claude".into(), "completed".into()],
3426                relevance: 0.7,
3427                created_at: Utc::now(),
3428            },
3429            Knowledge {
3430                id: Uuid::new_v4(),
3431                session_id: Uuid::new_v4(),
3432                kind: KnowledgeKind::Failure,
3433                scope_repo: None,
3434                scope_ink: None,
3435                title: "OOM crash during build".into(),
3436                body: "Details".into(),
3437                tags: vec!["failure".into()],
3438                relevance: 0.9,
3439                created_at: Utc::now(),
3440            },
3441        ];
3442
3443        let output = format_knowledge(&items);
3444        assert!(output.contains("KIND"));
3445        assert!(output.contains("TITLE"));
3446        assert!(output.contains("summary"));
3447        assert!(output.contains("failure"));
3448        assert!(output.contains("Fixed the auth bug"));
3449        assert!(output.contains("repo"));
3450        assert!(output.contains("0.70"));
3451    }
3452
3453    #[test]
3454    fn test_format_knowledge_long_title_truncated() {
3455        use chrono::Utc;
3456        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3457        use uuid::Uuid;
3458
3459        let items = vec![Knowledge {
3460            id: Uuid::new_v4(),
3461            session_id: Uuid::new_v4(),
3462            kind: KnowledgeKind::Summary,
3463            scope_repo: Some("/repo".into()),
3464            scope_ink: None,
3465            title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3466            body: "Body".into(),
3467            tags: vec![],
3468            relevance: 0.5,
3469            created_at: Utc::now(),
3470        }];
3471
3472        let output = format_knowledge(&items);
3473        assert!(output.contains('…'));
3474    }
3475
3476    #[test]
3477    fn test_cli_parse_knowledge_get() {
3478        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3479        match &cli.command {
3480            Commands::Knowledge { get, .. } => {
3481                assert_eq!(get.as_deref(), Some("abc-123"));
3482            }
3483            _ => panic!("expected Knowledge command"),
3484        }
3485    }
3486
3487    #[test]
3488    fn test_cli_parse_knowledge_delete() {
3489        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3490        match &cli.command {
3491            Commands::Knowledge { delete, .. } => {
3492                assert_eq!(delete.as_deref(), Some("abc-123"));
3493            }
3494            _ => panic!("expected Knowledge command"),
3495        }
3496    }
3497
3498    #[test]
3499    fn test_cli_parse_knowledge_push() {
3500        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3501        match &cli.command {
3502            Commands::Knowledge { push, .. } => {
3503                assert!(*push);
3504            }
3505            _ => panic!("expected Knowledge command"),
3506        }
3507    }
3508
3509    #[test]
3510    fn test_format_knowledge_no_repo() {
3511        use chrono::Utc;
3512        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3513        use uuid::Uuid;
3514
3515        let items = vec![Knowledge {
3516            id: Uuid::new_v4(),
3517            session_id: Uuid::new_v4(),
3518            kind: KnowledgeKind::Summary,
3519            scope_repo: None,
3520            scope_ink: None,
3521            title: "Global finding".into(),
3522            body: "Body".into(),
3523            tags: vec![],
3524            relevance: 0.5,
3525            created_at: Utc::now(),
3526        }];
3527
3528        let output = format_knowledge(&items);
3529        assert!(output.contains('-')); // "-" for no repo
3530    }
3531}