Skip to main content

pulpo_cli/
lib.rs

1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4    AuthTokenResponse, InterventionEventResponse, KnowledgeDeleteResponse, KnowledgeItemResponse,
5    KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12    name = "pulpo",
13    about = "Manage agent sessions across your machines",
14    version
15)]
16pub struct Cli {
17    /// Target node (default: localhost)
18    #[arg(long, default_value = "localhost:7433")]
19    pub node: String,
20
21    /// Auth token (auto-discovered from local daemon if omitted)
22    #[arg(long)]
23    pub token: Option<String>,
24
25    #[command(subcommand)]
26    pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32    /// Attach to a session's terminal
33    #[command(visible_alias = "a")]
34    Attach {
35        /// Session name or ID
36        name: String,
37    },
38
39    /// Send input to a session
40    #[command(visible_alias = "i")]
41    Input {
42        /// Session name or ID
43        name: String,
44        /// Text to send (sends Enter if omitted)
45        text: Option<String>,
46    },
47
48    /// Spawn a new agent session
49    #[command(visible_alias = "s")]
50    Spawn {
51        /// Working directory (defaults to current directory)
52        #[arg(long)]
53        workdir: Option<String>,
54
55        /// Session name (auto-derived from workdir if omitted)
56        #[arg(long)]
57        name: Option<String>,
58
59        /// Agent provider (claude, codex, gemini, opencode). Uses config `default_provider` or claude.
60        #[arg(long)]
61        provider: Option<String>,
62
63        /// Run in autonomous mode (fire-and-forget)
64        #[arg(long)]
65        auto: bool,
66
67        /// Disable all safety guardrails
68        #[arg(long)]
69        unrestricted: bool,
70
71        /// Model override (e.g. opus, sonnet)
72        #[arg(long)]
73        model: Option<String>,
74
75        /// System prompt to append
76        #[arg(long)]
77        system_prompt: Option<String>,
78
79        /// Explicit allowed tools (comma-separated)
80        #[arg(long, value_delimiter = ',')]
81        allowed_tools: Option<Vec<String>>,
82
83        /// Ink name (from config)
84        #[arg(long)]
85        ink: Option<String>,
86
87        /// Maximum agent turns before stopping
88        #[arg(long)]
89        max_turns: Option<u32>,
90
91        /// Maximum budget in USD before stopping
92        #[arg(long)]
93        max_budget: Option<f64>,
94
95        /// Output format (e.g. json, stream-json)
96        #[arg(long)]
97        output_format: Option<String>,
98
99        /// Task prompt
100        prompt: Vec<String>,
101    },
102
103    /// List all sessions
104    #[command(visible_alias = "ls")]
105    List,
106
107    /// Show session logs/output
108    #[command(visible_alias = "l")]
109    Logs {
110        /// Session name or ID
111        name: String,
112
113        /// Number of lines to fetch
114        #[arg(long, default_value = "100")]
115        lines: usize,
116
117        /// Follow output (like `tail -f`)
118        #[arg(short, long)]
119        follow: bool,
120    },
121
122    /// Kill a session
123    #[command(visible_alias = "k")]
124    Kill {
125        /// Session name or ID
126        name: String,
127    },
128
129    /// Permanently remove a session from history
130    #[command(visible_alias = "rm")]
131    Delete {
132        /// Session name or ID
133        name: String,
134    },
135
136    /// Resume a stale session
137    #[command(visible_alias = "r")]
138    Resume {
139        /// Session name or ID
140        name: String,
141    },
142
143    /// List all known nodes
144    #[command(visible_alias = "n")]
145    Nodes,
146
147    /// Show intervention history for a session
148    #[command(visible_alias = "iv")]
149    Interventions {
150        /// Session name or ID
151        name: String,
152    },
153
154    /// Open the web dashboard in your browser
155    Ui,
156
157    /// Query extracted knowledge from past sessions
158    #[command(visible_alias = "kn")]
159    Knowledge {
160        /// Filter by session ID
161        #[arg(long)]
162        session: Option<String>,
163
164        /// Filter by kind (summary, failure)
165        #[arg(long)]
166        kind: Option<String>,
167
168        /// Filter by repo/workdir
169        #[arg(long)]
170        repo: Option<String>,
171
172        /// Filter by ink name
173        #[arg(long)]
174        ink: Option<String>,
175
176        /// Maximum results
177        #[arg(long, default_value = "20")]
178        limit: usize,
179
180        /// Context mode: find relevant knowledge for a workdir
181        #[arg(long)]
182        context: bool,
183
184        /// Get a single knowledge item by ID
185        #[arg(long)]
186        get: Option<String>,
187
188        /// Delete a knowledge item by ID
189        #[arg(long)]
190        delete: Option<String>,
191
192        /// Push local knowledge to configured remote
193        #[arg(long)]
194        push: bool,
195    },
196
197    /// Manage scheduled agent runs via crontab
198    #[command(visible_alias = "sched")]
199    Schedule {
200        #[command(subcommand)]
201        action: ScheduleAction,
202    },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207    /// Install a cron schedule that spawns a session
208    Install {
209        /// Schedule name
210        name: String,
211        /// Cron expression (e.g. "0 3 * * *")
212        cron: String,
213        /// Working directory
214        #[arg(long)]
215        workdir: String,
216        /// Agent provider
217        #[arg(long, default_value = "claude")]
218        provider: String,
219        /// Task prompt
220        prompt: Vec<String>,
221    },
222    /// List installed pulpo cron schedules
223    #[command(alias = "ls")]
224    List,
225    /// Remove a cron schedule
226    #[command(alias = "rm")]
227    Remove {
228        /// Schedule name
229        name: String,
230    },
231    /// Pause a cron schedule (comments out the line)
232    Pause {
233        /// Schedule name
234        name: String,
235    },
236    /// Resume a paused cron schedule (uncomments the line)
237    Resume {
238        /// Schedule name
239        name: String,
240    },
241}
242
243/// Format the base URL from the node address.
244pub fn base_url(node: &str) -> String {
245    format!("http://{node}")
246}
247
248/// Response shape for the output endpoint.
249#[derive(serde::Deserialize)]
250struct OutputResponse {
251    output: String,
252}
253
254/// Format a list of sessions as a table.
255fn format_sessions(sessions: &[Session]) -> String {
256    if sessions.is_empty() {
257        return "No sessions.".into();
258    }
259    let mut lines = vec![format!(
260        "{:<20} {:<12} {:<10} {:<14} {}",
261        "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262    )];
263    for s in sessions {
264        let prompt_display = if s.prompt.len() > 40 {
265            format!("{}...", &s.prompt[..37])
266        } else {
267            s.prompt.clone()
268        };
269        let status_display = if s.waiting_for_input {
270            "waiting".to_owned()
271        } else {
272            s.status.to_string()
273        };
274        lines.push(format!(
275            "{:<20} {:<12} {:<10} {:<14} {}",
276            s.name, status_display, s.provider, s.mode, prompt_display
277        ));
278    }
279    lines.join("\n")
280}
281
282/// Format the peers response as a table.
283fn format_nodes(resp: &PeersResponse) -> String {
284    let mut lines = vec![format!(
285        "{:<20} {:<25} {:<10} {}",
286        "NAME", "ADDRESS", "STATUS", "SESSIONS"
287    )];
288    lines.push(format!(
289        "{:<20} {:<25} {:<10} {}",
290        resp.local.name, "(local)", "online", "-"
291    ));
292    for p in &resp.peers {
293        let sessions = p
294            .session_count
295            .map_or_else(|| "-".into(), |c| c.to_string());
296        lines.push(format!(
297            "{:<20} {:<25} {:<10} {}",
298            p.name, p.address, p.status, sessions
299        ));
300    }
301    lines.join("\n")
302}
303
304/// Format intervention events as a table.
305fn format_interventions(events: &[InterventionEventResponse]) -> String {
306    if events.is_empty() {
307        return "No intervention events.".into();
308    }
309    let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310    for e in events {
311        lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312    }
313    lines.join("\n")
314}
315
316/// Format knowledge items as a table.
317fn format_knowledge(items: &[Knowledge]) -> String {
318    if items.is_empty() {
319        return "No knowledge found.".into();
320    }
321    let mut lines = vec![format!(
322        "{:<10} {:<40} {:<10} {:<6} {}",
323        "KIND", "TITLE", "REPO", "REL", "TAGS"
324    )];
325    for k in items {
326        let title = if k.title.len() > 38 {
327            format!("{}…", &k.title[..37])
328        } else {
329            k.title.clone()
330        };
331        let repo = k
332            .scope_repo
333            .as_deref()
334            .and_then(|r| r.rsplit('/').next())
335            .unwrap_or("-");
336        let tags = k.tags.join(",");
337        lines.push(format!(
338            "{:<10} {:<40} {:<10} {:<6.2} {}",
339            k.kind, title, repo, k.relevance, tags
340        ));
341    }
342    lines.join("\n")
343}
344
345/// Build the command to open a URL in the default browser.
346#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348    #[cfg(target_os = "macos")]
349    {
350        let mut cmd = std::process::Command::new("open");
351        cmd.arg(url);
352        cmd
353    }
354    #[cfg(target_os = "linux")]
355    {
356        let mut cmd = std::process::Command::new("xdg-open");
357        cmd.arg(url);
358        cmd
359    }
360    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361    {
362        // Fallback: try xdg-open
363        let mut cmd = std::process::Command::new("xdg-open");
364        cmd.arg(url);
365        cmd
366    }
367}
368
369/// Open a URL in the default browser.
370#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372    build_open_command(url).status()?;
373    Ok(())
374}
375
376/// Stub for coverage builds — avoids opening a browser during tests.
377#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379    Ok(())
380}
381
382/// Build the command to attach to a session's terminal.
383/// Takes the backend session ID (e.g. `pulpo-my-session`) — the tmux session name.
384#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386    let mut cmd = std::process::Command::new("tmux");
387    cmd.args(["attach-session", "-t", backend_session_id]);
388    cmd
389}
390
391/// Attach to a session's terminal.
392#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394    let status = build_attach_command(backend_session_id).status()?;
395    if !status.success() {
396        anyhow::bail!("attach failed with {status}");
397    }
398    Ok(())
399}
400
401/// Stub for test and coverage builds — avoids spawning real terminals during tests.
402#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405    Ok(())
406}
407
408/// Extract a clean error message from an API JSON response (or fall back to raw text).
409fn api_error(text: &str) -> anyhow::Error {
410    serde_json::from_str::<serde_json::Value>(text)
411        .ok()
412        .and_then(|v| v["error"].as_str().map(String::from))
413        .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416/// Return the response body text, or a clean error if the response was non-success.
417async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418    if resp.status().is_success() {
419        Ok(resp.text().await?)
420    } else {
421        let text = resp.text().await?;
422        Err(api_error(&text))
423    }
424}
425
426/// Map a reqwest error to a user-friendly message.
427fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428    if err.is_connect() {
429        anyhow::anyhow!(
430            "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431        )
432    } else {
433        anyhow::anyhow!("Network error connecting to {node}: {err}")
434    }
435}
436
437/// Check if the node address points to localhost.
438fn is_localhost(node: &str) -> bool {
439    let host = node.split(':').next().unwrap_or(node);
440    host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443/// Try to auto-discover the auth token from a local daemon.
444async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445    let resp = client
446        .get(format!("{base}/api/v1/auth/token"))
447        .send()
448        .await
449        .ok()?;
450    let body: AuthTokenResponse = resp.json().await.ok()?;
451    if body.token.is_empty() {
452        None
453    } else {
454        Some(body.token)
455    }
456}
457
458/// Resolve the auth token: use explicit `--token`, auto-discover from localhost, or `None`.
459async fn resolve_token(
460    client: &reqwest::Client,
461    base: &str,
462    node: &str,
463    explicit: Option<&str>,
464) -> Option<String> {
465    if let Some(t) = explicit {
466        return Some(t.to_owned());
467    }
468    if is_localhost(node) {
469        return discover_token(client, base).await;
470    }
471    None
472}
473
474/// Build an authenticated GET request.
475fn authed_get(
476    client: &reqwest::Client,
477    url: String,
478    token: Option<&str>,
479) -> reqwest::RequestBuilder {
480    let req = client.get(url);
481    if let Some(t) = token {
482        req.bearer_auth(t)
483    } else {
484        req
485    }
486}
487
488/// Build an authenticated POST request.
489fn authed_post(
490    client: &reqwest::Client,
491    url: String,
492    token: Option<&str>,
493) -> reqwest::RequestBuilder {
494    let req = client.post(url);
495    if let Some(t) = token {
496        req.bearer_auth(t)
497    } else {
498        req
499    }
500}
501
502/// Build an authenticated DELETE request.
503fn authed_delete(
504    client: &reqwest::Client,
505    url: String,
506    token: Option<&str>,
507) -> reqwest::RequestBuilder {
508    let req = client.delete(url);
509    if let Some(t) = token {
510        req.bearer_auth(t)
511    } else {
512        req
513    }
514}
515
516/// Fetch session output from the API.
517async fn fetch_output(
518    client: &reqwest::Client,
519    base: &str,
520    name: &str,
521    lines: usize,
522    token: Option<&str>,
523) -> Result<String> {
524    let resp = authed_get(
525        client,
526        format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527        token,
528    )
529    .send()
530    .await?;
531    let text = ok_or_api_error(resp).await?;
532    let output: OutputResponse = serde_json::from_str(&text)?;
533    Ok(output.output)
534}
535
536/// Fetch session status from the API.
537async fn fetch_session_status(
538    client: &reqwest::Client,
539    base: &str,
540    name: &str,
541    token: Option<&str>,
542) -> Result<String> {
543    let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544        .send()
545        .await?;
546    let text = ok_or_api_error(resp).await?;
547    let session: Session = serde_json::from_str(&text)?;
548    Ok(session.status.to_string())
549}
550
551/// Compute the new trailing lines that differ from the previous output.
552///
553/// The output endpoint returns the last N lines from the terminal pane. As new lines
554/// appear, old lines at the top scroll off. We find the overlap between the end
555/// of `prev` and the beginning-to-middle of `new`, then return only the truly new
556/// trailing lines.
557fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558    if prev.is_empty() {
559        return new;
560    }
561
562    let prev_lines: Vec<&str> = prev.lines().collect();
563    let new_lines: Vec<&str> = new.lines().collect();
564
565    if new_lines.is_empty() {
566        return "";
567    }
568
569    // prev is non-empty (early return above), so last() always succeeds
570    let last_prev = prev_lines[prev_lines.len() - 1];
571
572    // Find the last line of prev in new to determine the overlap boundary
573    for i in (0..new_lines.len()).rev() {
574        if new_lines[i] == last_prev {
575            // Verify contiguous overlap: check that lines before this match too
576            let overlap_len = prev_lines.len().min(i + 1);
577            let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578            let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579            if prev_tail == new_overlap {
580                if i + 1 < new_lines.len() {
581                    // Return the slice of `new` after the overlap
582                    let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583                    return new.get(consumed.min(new.len())..).unwrap_or("");
584                }
585                return "";
586            }
587        }
588    }
589
590    // No overlap found — output changed completely, print it all
591    new
592}
593
594/// Follow logs by polling, printing only new output. Returns when the session ends.
595async fn follow_logs(
596    client: &reqwest::Client,
597    base: &str,
598    name: &str,
599    lines: usize,
600    token: Option<&str>,
601    writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603    let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604    write!(writer, "{prev_output}")?;
605
606    loop {
607        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609        // Check session status
610        let status = fetch_session_status(client, base, name, token).await?;
611        let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613        // Fetch latest output
614        let new_output = fetch_output(client, base, name, lines, token).await?;
615
616        let diff = diff_output(&prev_output, &new_output);
617        if !diff.is_empty() {
618            write!(writer, "{diff}")?;
619        }
620        prev_output = new_output;
621
622        if is_terminal {
623            break;
624        }
625    }
626    Ok(())
627}
628
629// --- Crontab wrapper ---
630
631#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634/// Read the current crontab. Returns empty string if no crontab exists.
635#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637    let output = std::process::Command::new("crontab").arg("-l").output()?;
638    if output.status.success() {
639        Ok(String::from_utf8_lossy(&output.stdout).to_string())
640    } else {
641        Ok(String::new())
642    }
643}
644
645/// Write the given content as the user's crontab.
646#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648    use std::io::Write;
649    let mut child = std::process::Command::new("crontab")
650        .arg("-")
651        .stdin(std::process::Stdio::piped())
652        .spawn()?;
653    child
654        .stdin
655        .as_mut()
656        .unwrap()
657        .write_all(content.as_bytes())?;
658    let status = child.wait()?;
659    if !status.success() {
660        anyhow::bail!("crontab write failed");
661    }
662    Ok(())
663}
664
665/// Build the crontab line for a pulpo schedule.
666#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668    name: &str,
669    cron: &str,
670    workdir: &str,
671    provider: &str,
672    prompt: &str,
673    node: &str,
674) -> String {
675    format!(
676        "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677    )
678}
679
680/// Install a cron schedule into a crontab string. Returns the updated crontab.
681#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683    let tag = format!("{CRONTAB_TAG}{name}");
684    if crontab.contains(&tag) {
685        anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686    }
687    let mut result = crontab.to_owned();
688    result.push_str(line);
689    Ok(result)
690}
691
692/// Format pulpo crontab entries for display.
693#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695    let entries: Vec<&str> = crontab
696        .lines()
697        .filter(|l| l.contains(CRONTAB_TAG))
698        .collect();
699    if entries.is_empty() {
700        return "No pulpo schedules.".into();
701    }
702    let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703    for entry in entries {
704        let paused = entry.starts_with('#');
705        let raw = entry.trim_start_matches('#').trim();
706        let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707        let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708        let cron_expr = if parts.len() >= 5 {
709            parts[..5].join(" ")
710        } else {
711            "?".into()
712        };
713        lines.push(format!(
714            "{:<20} {:<15} {}",
715            name,
716            cron_expr,
717            if paused { "yes" } else { "no" }
718        ));
719    }
720    lines.join("\n")
721}
722
723/// Remove a schedule from a crontab string. Returns the updated crontab.
724#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726    use std::fmt::Write;
727    let tag = format!("{CRONTAB_TAG}{name}");
728    let filtered =
729        crontab
730            .lines()
731            .filter(|l| !l.contains(&tag))
732            .fold(String::new(), |mut acc, l| {
733                writeln!(acc, "{l}").unwrap();
734                acc
735            });
736    if filtered.len() == crontab.len() {
737        anyhow::bail!("schedule \"{name}\" not found");
738    }
739    Ok(filtered)
740}
741
742/// Pause (comment out) a schedule in a crontab string.
743#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745    use std::fmt::Write;
746    let tag = format!("{CRONTAB_TAG}{name}");
747    let mut found = false;
748    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749        if l.contains(&tag) && !l.starts_with('#') {
750            found = true;
751            writeln!(acc, "#{l}").unwrap();
752        } else {
753            writeln!(acc, "{l}").unwrap();
754        }
755        acc
756    });
757    if !found {
758        anyhow::bail!("schedule \"{name}\" not found or already paused");
759    }
760    Ok(updated)
761}
762
763/// Resume (uncomment) a schedule in a crontab string.
764#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766    use std::fmt::Write;
767    let tag = format!("{CRONTAB_TAG}{name}");
768    let mut found = false;
769    let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770        if l.contains(&tag) && l.starts_with('#') {
771            found = true;
772            writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773        } else {
774            writeln!(acc, "{l}").unwrap();
775        }
776        acc
777    });
778    if !found {
779        anyhow::bail!("schedule \"{name}\" not found or not paused");
780    }
781    Ok(updated)
782}
783
784/// Execute a schedule subcommand using the crontab wrapper.
785#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787    match action {
788        ScheduleAction::Install {
789            name,
790            cron,
791            workdir,
792            provider,
793            prompt,
794        } => {
795            let crontab = read_crontab()?;
796            let joined_prompt = prompt.join(" ");
797            let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798            let updated = crontab_install(&crontab, name, &line)?;
799            write_crontab(&updated)?;
800            Ok(format!("Installed schedule \"{name}\""))
801        }
802        ScheduleAction::List => {
803            let crontab = read_crontab()?;
804            Ok(crontab_list(&crontab))
805        }
806        ScheduleAction::Remove { name } => {
807            let crontab = read_crontab()?;
808            let updated = crontab_remove(&crontab, name)?;
809            write_crontab(&updated)?;
810            Ok(format!("Removed schedule \"{name}\""))
811        }
812        ScheduleAction::Pause { name } => {
813            let crontab = read_crontab()?;
814            let updated = crontab_pause(&crontab, name)?;
815            write_crontab(&updated)?;
816            Ok(format!("Paused schedule \"{name}\""))
817        }
818        ScheduleAction::Resume { name } => {
819            let crontab = read_crontab()?;
820            let updated = crontab_resume(&crontab, name)?;
821            write_crontab(&updated)?;
822            Ok(format!("Resumed schedule \"{name}\""))
823        }
824    }
825}
826
827/// Stub for coverage builds — crontab is real I/O.
828#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830    Ok(String::new())
831}
832
833/// Execute the given CLI command against the specified node.
834#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836    let url = base_url(&cli.node);
837    let client = reqwest::Client::new();
838    let node = &cli.node;
839    let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841    match &cli.command {
842        Commands::Attach { name } => {
843            // Fetch session to get actual backend_session_id
844            let resp = authed_get(
845                &client,
846                format!("{url}/api/v1/sessions/{name}"),
847                token.as_deref(),
848            )
849            .send()
850            .await
851            .map_err(|e| friendly_error(&e, node))?;
852            let text = ok_or_api_error(resp).await?;
853            let session: Session = serde_json::from_str(&text)?;
854            let backend_id = session
855                .backend_session_id
856                .unwrap_or_else(|| format!("pulpo-{name}"));
857            attach_session(&backend_id)?;
858            Ok(format!("Detached from session {name}."))
859        }
860        Commands::Input { name, text } => {
861            let input_text = text.as_deref().unwrap_or("\n");
862            let body = serde_json::json!({ "text": input_text });
863            let resp = authed_post(
864                &client,
865                format!("{url}/api/v1/sessions/{name}/input"),
866                token.as_deref(),
867            )
868            .json(&body)
869            .send()
870            .await
871            .map_err(|e| friendly_error(&e, node))?;
872            ok_or_api_error(resp).await?;
873            Ok(format!("Sent input to session {name}."))
874        }
875        Commands::List => {
876            let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
877                .send()
878                .await
879                .map_err(|e| friendly_error(&e, node))?;
880            let text = ok_or_api_error(resp).await?;
881            let sessions: Vec<Session> = serde_json::from_str(&text)?;
882            Ok(format_sessions(&sessions))
883        }
884        Commands::Nodes => {
885            let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
886                .send()
887                .await
888                .map_err(|e| friendly_error(&e, node))?;
889            let text = ok_or_api_error(resp).await?;
890            let resp: PeersResponse = serde_json::from_str(&text)?;
891            Ok(format_nodes(&resp))
892        }
893        Commands::Spawn {
894            workdir,
895            name,
896            provider,
897            auto,
898            unrestricted,
899            model,
900            system_prompt,
901            allowed_tools,
902            ink,
903            max_turns,
904            max_budget,
905            output_format,
906            prompt,
907        } => {
908            let prompt_text = prompt.join(" ");
909            let mode = if *auto { "autonomous" } else { "interactive" };
910            // Resolve workdir: --workdir flag > current directory
911            let resolved_workdir = workdir.clone().unwrap_or_else(|| {
912                std::env::current_dir()
913                    .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
914            });
915            let mut body = serde_json::json!({
916                "workdir": resolved_workdir,
917                "mode": mode,
918            });
919            // Only include prompt if non-empty
920            if !prompt_text.is_empty() {
921                body["prompt"] = serde_json::json!(prompt_text);
922            }
923            // Only include provider if explicitly specified
924            if let Some(p) = provider {
925                body["provider"] = serde_json::json!(p);
926            }
927            if *unrestricted {
928                body["unrestricted"] = serde_json::json!(true);
929            }
930            if let Some(n) = name {
931                body["name"] = serde_json::json!(n);
932            }
933            if let Some(m) = model {
934                body["model"] = serde_json::json!(m);
935            }
936            if let Some(sp) = system_prompt {
937                body["system_prompt"] = serde_json::json!(sp);
938            }
939            if let Some(tools) = allowed_tools {
940                body["allowed_tools"] = serde_json::json!(tools);
941            }
942            if let Some(p) = ink {
943                body["ink"] = serde_json::json!(p);
944            }
945            if let Some(mt) = max_turns {
946                body["max_turns"] = serde_json::json!(mt);
947            }
948            if let Some(mb) = max_budget {
949                body["max_budget_usd"] = serde_json::json!(mb);
950            }
951            if let Some(of) = output_format {
952                body["output_format"] = serde_json::json!(of);
953            }
954            let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
955                .json(&body)
956                .send()
957                .await
958                .map_err(|e| friendly_error(&e, node))?;
959            let text = ok_or_api_error(resp).await?;
960            let session: Session = serde_json::from_str(&text)?;
961            Ok(format!(
962                "Created session \"{}\" ({})",
963                session.name, session.id
964            ))
965        }
966        Commands::Kill { name } => {
967            let resp = authed_post(
968                &client,
969                format!("{url}/api/v1/sessions/{name}/kill"),
970                token.as_deref(),
971            )
972            .send()
973            .await
974            .map_err(|e| friendly_error(&e, node))?;
975            ok_or_api_error(resp).await?;
976            Ok(format!("Session {name} killed."))
977        }
978        Commands::Delete { name } => {
979            let resp = authed_delete(
980                &client,
981                format!("{url}/api/v1/sessions/{name}"),
982                token.as_deref(),
983            )
984            .send()
985            .await
986            .map_err(|e| friendly_error(&e, node))?;
987            ok_or_api_error(resp).await?;
988            Ok(format!("Session {name} deleted."))
989        }
990        Commands::Logs {
991            name,
992            lines,
993            follow,
994        } => {
995            if *follow {
996                let mut stdout = std::io::stdout();
997                follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
998                    .await
999                    .map_err(|e| {
1000                        // Unwrap reqwest errors to friendly messages
1001                        match e.downcast::<reqwest::Error>() {
1002                            Ok(re) => friendly_error(&re, node),
1003                            Err(other) => other,
1004                        }
1005                    })?;
1006                Ok(String::new())
1007            } else {
1008                let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1009                    .await
1010                    .map_err(|e| match e.downcast::<reqwest::Error>() {
1011                        Ok(re) => friendly_error(&re, node),
1012                        Err(other) => other,
1013                    })?;
1014                Ok(output)
1015            }
1016        }
1017        Commands::Interventions { name } => {
1018            let resp = authed_get(
1019                &client,
1020                format!("{url}/api/v1/sessions/{name}/interventions"),
1021                token.as_deref(),
1022            )
1023            .send()
1024            .await
1025            .map_err(|e| friendly_error(&e, node))?;
1026            let text = ok_or_api_error(resp).await?;
1027            let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1028            Ok(format_interventions(&events))
1029        }
1030        Commands::Ui => {
1031            let dashboard = base_url(&cli.node);
1032            open_browser(&dashboard)?;
1033            Ok(format!("Opening {dashboard}"))
1034        }
1035        Commands::Resume { name } => {
1036            let resp = authed_post(
1037                &client,
1038                format!("{url}/api/v1/sessions/{name}/resume"),
1039                token.as_deref(),
1040            )
1041            .send()
1042            .await
1043            .map_err(|e| friendly_error(&e, node))?;
1044            let text = ok_or_api_error(resp).await?;
1045            let session: Session = serde_json::from_str(&text)?;
1046            Ok(format!("Resumed session \"{}\"", session.name))
1047        }
1048        Commands::Knowledge {
1049            session,
1050            kind,
1051            repo,
1052            ink,
1053            limit,
1054            context,
1055            get,
1056            delete,
1057            push,
1058        } => {
1059            // Single-item get
1060            if let Some(id) = get {
1061                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1062                let resp = authed_get(&client, endpoint, token.as_deref())
1063                    .send()
1064                    .await
1065                    .map_err(|e| friendly_error(&e, node))?;
1066                let text = ok_or_api_error(resp).await?;
1067                let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1068                return Ok(format_knowledge(&[resp.knowledge]));
1069            }
1070
1071            // Delete by ID
1072            if let Some(id) = delete {
1073                let endpoint = format!("{url}/api/v1/knowledge/{id}");
1074                let resp = authed_delete(&client, endpoint, token.as_deref())
1075                    .send()
1076                    .await
1077                    .map_err(|e| friendly_error(&e, node))?;
1078                let text = ok_or_api_error(resp).await?;
1079                let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1080                return Ok(if resp.deleted {
1081                    format!("Deleted knowledge item {id}")
1082                } else {
1083                    format!("Knowledge item {id} not found")
1084                });
1085            }
1086
1087            // Push to remote
1088            if *push {
1089                let endpoint = format!("{url}/api/v1/knowledge/push");
1090                let resp = authed_post(&client, endpoint, token.as_deref())
1091                    .send()
1092                    .await
1093                    .map_err(|e| friendly_error(&e, node))?;
1094                let text = ok_or_api_error(resp).await?;
1095                let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1096                return Ok(resp.message);
1097            }
1098
1099            // List / context query
1100            let mut params = vec![format!("limit={limit}")];
1101            let endpoint = if *context {
1102                if let Some(r) = repo {
1103                    params.push(format!("workdir={r}"));
1104                }
1105                if let Some(i) = ink {
1106                    params.push(format!("ink={i}"));
1107                }
1108                format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1109            } else {
1110                if let Some(s) = session {
1111                    params.push(format!("session_id={s}"));
1112                }
1113                if let Some(k) = kind {
1114                    params.push(format!("kind={k}"));
1115                }
1116                if let Some(r) = repo {
1117                    params.push(format!("repo={r}"));
1118                }
1119                if let Some(i) = ink {
1120                    params.push(format!("ink={i}"));
1121                }
1122                format!("{url}/api/v1/knowledge?{}", params.join("&"))
1123            };
1124            let resp = authed_get(&client, endpoint, token.as_deref())
1125                .send()
1126                .await
1127                .map_err(|e| friendly_error(&e, node))?;
1128            let text = ok_or_api_error(resp).await?;
1129            let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1130            Ok(format_knowledge(&resp.knowledge))
1131        }
1132        Commands::Schedule { action } => execute_schedule(action, node),
1133    }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138    use super::*;
1139
1140    #[test]
1141    fn test_base_url() {
1142        assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1143        assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1144    }
1145
1146    #[test]
1147    fn test_cli_parse_list() {
1148        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1149        assert_eq!(cli.node, "localhost:7433");
1150        assert!(matches!(cli.command, Commands::List));
1151    }
1152
1153    #[test]
1154    fn test_cli_parse_nodes() {
1155        let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1156        assert!(matches!(cli.command, Commands::Nodes));
1157    }
1158
1159    #[test]
1160    fn test_cli_parse_ui() {
1161        let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1162        assert!(matches!(cli.command, Commands::Ui));
1163    }
1164
1165    #[test]
1166    fn test_cli_parse_ui_custom_node() {
1167        let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1168        assert!(matches!(cli.command, Commands::Ui));
1169        assert_eq!(cli.node, "mac-mini:7433");
1170    }
1171
1172    #[test]
1173    fn test_build_open_command() {
1174        let cmd = build_open_command("http://localhost:7433");
1175        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1176        assert_eq!(args, vec!["http://localhost:7433"]);
1177        #[cfg(target_os = "macos")]
1178        assert_eq!(cmd.get_program(), "open");
1179        #[cfg(target_os = "linux")]
1180        assert_eq!(cmd.get_program(), "xdg-open");
1181    }
1182
1183    #[test]
1184    fn test_cli_parse_spawn() {
1185        let cli = Cli::try_parse_from([
1186            "pulpo",
1187            "spawn",
1188            "--workdir",
1189            "/tmp/repo",
1190            "Fix",
1191            "the",
1192            "bug",
1193        ])
1194        .unwrap();
1195        assert!(matches!(
1196            &cli.command,
1197            Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1198                if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1199                && !unrestricted && prompt == &["Fix", "the", "bug"]
1200        ));
1201    }
1202
1203    #[test]
1204    fn test_cli_parse_spawn_with_provider() {
1205        let cli = Cli::try_parse_from([
1206            "pulpo",
1207            "spawn",
1208            "--workdir",
1209            "/tmp",
1210            "--provider",
1211            "codex",
1212            "Do it",
1213        ])
1214        .unwrap();
1215        assert!(matches!(
1216            &cli.command,
1217            Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1218        ));
1219    }
1220
1221    #[test]
1222    fn test_cli_parse_spawn_auto() {
1223        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1224            .unwrap();
1225        assert!(matches!(
1226            &cli.command,
1227            Commands::Spawn { auto, .. } if *auto
1228        ));
1229    }
1230
1231    #[test]
1232    fn test_cli_parse_spawn_unrestricted() {
1233        let cli = Cli::try_parse_from([
1234            "pulpo",
1235            "spawn",
1236            "--workdir",
1237            "/tmp",
1238            "--unrestricted",
1239            "Do it",
1240        ])
1241        .unwrap();
1242        assert!(matches!(
1243            &cli.command,
1244            Commands::Spawn { unrestricted, .. } if *unrestricted
1245        ));
1246    }
1247
1248    #[test]
1249    fn test_cli_parse_spawn_unrestricted_default() {
1250        let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1251        assert!(matches!(
1252            &cli.command,
1253            Commands::Spawn { unrestricted, .. } if !unrestricted
1254        ));
1255    }
1256
1257    #[test]
1258    fn test_cli_parse_spawn_with_name() {
1259        let cli = Cli::try_parse_from([
1260            "pulpo",
1261            "spawn",
1262            "--workdir",
1263            "/tmp/repo",
1264            "--name",
1265            "my-task",
1266            "Fix it",
1267        ])
1268        .unwrap();
1269        assert!(matches!(
1270            &cli.command,
1271            Commands::Spawn { workdir, name, .. }
1272                if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1273        ));
1274    }
1275
1276    #[test]
1277    fn test_cli_parse_spawn_without_name() {
1278        let cli =
1279            Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1280        assert!(matches!(
1281            &cli.command,
1282            Commands::Spawn { name, .. } if name.is_none()
1283        ));
1284    }
1285
1286    #[test]
1287    fn test_cli_parse_logs() {
1288        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1289        assert!(matches!(
1290            &cli.command,
1291            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1292        ));
1293    }
1294
1295    #[test]
1296    fn test_cli_parse_logs_with_lines() {
1297        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1298        assert!(matches!(
1299            &cli.command,
1300            Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1301        ));
1302    }
1303
1304    #[test]
1305    fn test_cli_parse_logs_follow() {
1306        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1307        assert!(matches!(
1308            &cli.command,
1309            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1310        ));
1311    }
1312
1313    #[test]
1314    fn test_cli_parse_logs_follow_short() {
1315        let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1316        assert!(matches!(
1317            &cli.command,
1318            Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1319        ));
1320    }
1321
1322    #[test]
1323    fn test_cli_parse_kill() {
1324        let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1325        assert!(matches!(
1326            &cli.command,
1327            Commands::Kill { name } if name == "my-session"
1328        ));
1329    }
1330
1331    #[test]
1332    fn test_cli_parse_delete() {
1333        let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1334        assert!(matches!(
1335            &cli.command,
1336            Commands::Delete { name } if name == "my-session"
1337        ));
1338    }
1339
1340    #[test]
1341    fn test_cli_parse_resume() {
1342        let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1343        assert!(matches!(
1344            &cli.command,
1345            Commands::Resume { name } if name == "my-session"
1346        ));
1347    }
1348
1349    #[test]
1350    fn test_cli_parse_input() {
1351        let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1352        assert!(matches!(
1353            &cli.command,
1354            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1355        ));
1356    }
1357
1358    #[test]
1359    fn test_cli_parse_input_no_text() {
1360        let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1361        assert!(matches!(
1362            &cli.command,
1363            Commands::Input { name, text } if name == "my-session" && text.is_none()
1364        ));
1365    }
1366
1367    #[test]
1368    fn test_cli_parse_input_alias() {
1369        let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1370        assert!(matches!(
1371            &cli.command,
1372            Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1373        ));
1374    }
1375
1376    #[test]
1377    fn test_cli_parse_custom_node() {
1378        let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1379        assert_eq!(cli.node, "win-pc:8080");
1380    }
1381
1382    #[test]
1383    fn test_cli_version() {
1384        let result = Cli::try_parse_from(["pulpo", "--version"]);
1385        // clap exits with an error (kind DisplayVersion) when --version is used
1386        let err = result.unwrap_err();
1387        assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1388    }
1389
1390    #[test]
1391    fn test_cli_parse_no_subcommand_fails() {
1392        let result = Cli::try_parse_from(["pulpo"]);
1393        assert!(result.is_err());
1394    }
1395
1396    #[test]
1397    fn test_cli_debug() {
1398        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1399        let debug = format!("{cli:?}");
1400        assert!(debug.contains("List"));
1401    }
1402
1403    #[test]
1404    fn test_commands_debug() {
1405        let cmd = Commands::List;
1406        assert_eq!(format!("{cmd:?}"), "List");
1407    }
1408
1409    /// A valid Session JSON for test responses.
1410    const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1411
1412    /// Start a lightweight test HTTP server and return its address.
1413    async fn start_test_server() -> String {
1414        use axum::http::StatusCode;
1415        use axum::{
1416            Json, Router,
1417            routing::{get, post},
1418        };
1419
1420        let session_json = TEST_SESSION_JSON;
1421
1422        let app = Router::new()
1423            .route(
1424                "/api/v1/sessions",
1425                get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1426                    (StatusCode::CREATED, session_json.to_owned())
1427                }),
1428            )
1429            .route(
1430                "/api/v1/sessions/{id}",
1431                get(move || async move { session_json.to_owned() })
1432                    .delete(|| async { StatusCode::NO_CONTENT }),
1433            )
1434            .route(
1435                "/api/v1/sessions/{id}/kill",
1436                post(|| async { StatusCode::NO_CONTENT }),
1437            )
1438            .route(
1439                "/api/v1/sessions/{id}/output",
1440                get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1441            )
1442            .route(
1443                "/api/v1/peers",
1444                get(|| async {
1445                    r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1446                }),
1447            )
1448            .route(
1449                "/api/v1/sessions/{id}/resume",
1450                axum::routing::post(move || async move { session_json.to_owned() }),
1451            )
1452            .route(
1453                "/api/v1/sessions/{id}/interventions",
1454                get(|| async { "[]".to_owned() }),
1455            )
1456            .route(
1457                "/api/v1/sessions/{id}/input",
1458                post(|| async { StatusCode::NO_CONTENT }),
1459            );
1460
1461        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1462        let addr = listener.local_addr().unwrap();
1463        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1464        format!("127.0.0.1:{}", addr.port())
1465    }
1466
1467    #[tokio::test]
1468    async fn test_execute_list_success() {
1469        let node = start_test_server().await;
1470        let cli = Cli {
1471            node,
1472            token: None,
1473            command: Commands::List,
1474        };
1475        let result = execute(&cli).await.unwrap();
1476        assert_eq!(result, "No sessions.");
1477    }
1478
1479    #[tokio::test]
1480    async fn test_execute_nodes_success() {
1481        let node = start_test_server().await;
1482        let cli = Cli {
1483            node,
1484            token: None,
1485            command: Commands::Nodes,
1486        };
1487        let result = execute(&cli).await.unwrap();
1488        assert!(result.contains("test"));
1489        assert!(result.contains("(local)"));
1490        assert!(result.contains("NAME"));
1491    }
1492
1493    #[tokio::test]
1494    async fn test_execute_spawn_success() {
1495        let node = start_test_server().await;
1496        let cli = Cli {
1497            node,
1498            token: None,
1499            command: Commands::Spawn {
1500                workdir: Some("/tmp/repo".into()),
1501                name: None,
1502                provider: Some("claude".into()),
1503                auto: false,
1504                unrestricted: false,
1505                model: None,
1506                system_prompt: None,
1507                allowed_tools: None,
1508                ink: None,
1509                max_turns: None,
1510                max_budget: None,
1511                output_format: None,
1512                prompt: vec!["Fix".into(), "bug".into()],
1513            },
1514        };
1515        let result = execute(&cli).await.unwrap();
1516        assert!(result.contains("Created session"));
1517        assert!(result.contains("repo"));
1518    }
1519
1520    #[tokio::test]
1521    async fn test_execute_spawn_with_all_new_flags() {
1522        let node = start_test_server().await;
1523        let cli = Cli {
1524            node,
1525            token: None,
1526            command: Commands::Spawn {
1527                workdir: Some("/tmp/repo".into()),
1528                name: None,
1529                provider: Some("claude".into()),
1530                auto: false,
1531                unrestricted: false,
1532                model: Some("opus".into()),
1533                system_prompt: Some("Be helpful".into()),
1534                allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1535                ink: Some("coder".into()),
1536                max_turns: Some(5),
1537                max_budget: Some(2.5),
1538                output_format: Some("json".into()),
1539                prompt: vec!["Fix".into(), "bug".into()],
1540            },
1541        };
1542        let result = execute(&cli).await.unwrap();
1543        assert!(result.contains("Created session"));
1544    }
1545
1546    #[tokio::test]
1547    async fn test_execute_spawn_auto_mode() {
1548        let node = start_test_server().await;
1549        let cli = Cli {
1550            node,
1551            token: None,
1552            command: Commands::Spawn {
1553                workdir: Some("/tmp/repo".into()),
1554                name: None,
1555                provider: Some("claude".into()),
1556                auto: true,
1557                unrestricted: false,
1558                model: None,
1559                system_prompt: None,
1560                allowed_tools: None,
1561                ink: None,
1562                max_turns: None,
1563                max_budget: None,
1564                output_format: None,
1565                prompt: vec!["Do it".into()],
1566            },
1567        };
1568        let result = execute(&cli).await.unwrap();
1569        assert!(result.contains("Created session"));
1570    }
1571
1572    #[tokio::test]
1573    async fn test_execute_spawn_with_name() {
1574        let node = start_test_server().await;
1575        let cli = Cli {
1576            node,
1577            token: None,
1578            command: Commands::Spawn {
1579                workdir: Some("/tmp/repo".into()),
1580                name: Some("my-task".into()),
1581                provider: Some("claude".into()),
1582                auto: false,
1583                unrestricted: false,
1584                model: None,
1585                system_prompt: None,
1586                allowed_tools: None,
1587                ink: None,
1588                max_turns: None,
1589                max_budget: None,
1590                output_format: None,
1591                prompt: vec!["Fix".into(), "bug".into()],
1592            },
1593        };
1594        let result = execute(&cli).await.unwrap();
1595        assert!(result.contains("Created session"));
1596    }
1597
1598    #[tokio::test]
1599    async fn test_execute_kill_success() {
1600        let node = start_test_server().await;
1601        let cli = Cli {
1602            node,
1603            token: None,
1604            command: Commands::Kill {
1605                name: "test-session".into(),
1606            },
1607        };
1608        let result = execute(&cli).await.unwrap();
1609        assert!(result.contains("killed"));
1610    }
1611
1612    #[tokio::test]
1613    async fn test_execute_delete_success() {
1614        let node = start_test_server().await;
1615        let cli = Cli {
1616            node,
1617            token: None,
1618            command: Commands::Delete {
1619                name: "test-session".into(),
1620            },
1621        };
1622        let result = execute(&cli).await.unwrap();
1623        assert!(result.contains("deleted"));
1624    }
1625
1626    #[tokio::test]
1627    async fn test_execute_logs_success() {
1628        let node = start_test_server().await;
1629        let cli = Cli {
1630            node,
1631            token: None,
1632            command: Commands::Logs {
1633                name: "test-session".into(),
1634                lines: 50,
1635                follow: false,
1636            },
1637        };
1638        let result = execute(&cli).await.unwrap();
1639        assert!(result.contains("test output"));
1640    }
1641
1642    #[tokio::test]
1643    async fn test_execute_list_connection_refused() {
1644        let cli = Cli {
1645            node: "localhost:1".into(),
1646            token: None,
1647            command: Commands::List,
1648        };
1649        let result = execute(&cli).await;
1650        let err = result.unwrap_err().to_string();
1651        assert!(
1652            err.contains("Could not connect to pulpod"),
1653            "Expected friendly error, got: {err}"
1654        );
1655        assert!(err.contains("localhost:1"));
1656    }
1657
1658    #[tokio::test]
1659    async fn test_execute_nodes_connection_refused() {
1660        let cli = Cli {
1661            node: "localhost:1".into(),
1662            token: None,
1663            command: Commands::Nodes,
1664        };
1665        let result = execute(&cli).await;
1666        let err = result.unwrap_err().to_string();
1667        assert!(err.contains("Could not connect to pulpod"));
1668    }
1669
1670    #[tokio::test]
1671    async fn test_execute_kill_error_response() {
1672        use axum::{Router, http::StatusCode, routing::post};
1673
1674        let app = Router::new().route(
1675            "/api/v1/sessions/{id}/kill",
1676            post(|| async {
1677                (
1678                    StatusCode::NOT_FOUND,
1679                    "{\"error\":\"session not found: test-session\"}",
1680                )
1681            }),
1682        );
1683        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1684        let addr = listener.local_addr().unwrap();
1685        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1686        let node = format!("127.0.0.1:{}", addr.port());
1687
1688        let cli = Cli {
1689            node,
1690            token: None,
1691            command: Commands::Kill {
1692                name: "test-session".into(),
1693            },
1694        };
1695        let err = execute(&cli).await.unwrap_err();
1696        assert_eq!(err.to_string(), "session not found: test-session");
1697    }
1698
1699    #[tokio::test]
1700    async fn test_execute_delete_error_response() {
1701        use axum::{Router, http::StatusCode, routing::delete};
1702
1703        let app = Router::new().route(
1704            "/api/v1/sessions/{id}",
1705            delete(|| async {
1706                (
1707                    StatusCode::CONFLICT,
1708                    "{\"error\":\"cannot delete session in 'running' state\"}",
1709                )
1710            }),
1711        );
1712        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1713        let addr = listener.local_addr().unwrap();
1714        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1715        let node = format!("127.0.0.1:{}", addr.port());
1716
1717        let cli = Cli {
1718            node,
1719            token: None,
1720            command: Commands::Delete {
1721                name: "test-session".into(),
1722            },
1723        };
1724        let err = execute(&cli).await.unwrap_err();
1725        assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1726    }
1727
1728    #[tokio::test]
1729    async fn test_execute_logs_error_response() {
1730        use axum::{Router, http::StatusCode, routing::get};
1731
1732        let app = Router::new().route(
1733            "/api/v1/sessions/{id}/output",
1734            get(|| async {
1735                (
1736                    StatusCode::NOT_FOUND,
1737                    "{\"error\":\"session not found: ghost\"}",
1738                )
1739            }),
1740        );
1741        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1742        let addr = listener.local_addr().unwrap();
1743        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1744        let node = format!("127.0.0.1:{}", addr.port());
1745
1746        let cli = Cli {
1747            node,
1748            token: None,
1749            command: Commands::Logs {
1750                name: "ghost".into(),
1751                lines: 50,
1752                follow: false,
1753            },
1754        };
1755        let err = execute(&cli).await.unwrap_err();
1756        assert_eq!(err.to_string(), "session not found: ghost");
1757    }
1758
1759    #[tokio::test]
1760    async fn test_execute_resume_error_response() {
1761        use axum::{Router, http::StatusCode, routing::post};
1762
1763        let app = Router::new().route(
1764            "/api/v1/sessions/{id}/resume",
1765            post(|| async {
1766                (
1767                    StatusCode::BAD_REQUEST,
1768                    "{\"error\":\"session is not stale (status: running)\"}",
1769                )
1770            }),
1771        );
1772        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1773        let addr = listener.local_addr().unwrap();
1774        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1775        let node = format!("127.0.0.1:{}", addr.port());
1776
1777        let cli = Cli {
1778            node,
1779            token: None,
1780            command: Commands::Resume {
1781                name: "test-session".into(),
1782            },
1783        };
1784        let err = execute(&cli).await.unwrap_err();
1785        assert_eq!(err.to_string(), "session is not stale (status: running)");
1786    }
1787
1788    #[tokio::test]
1789    async fn test_execute_spawn_error_response() {
1790        use axum::{Router, http::StatusCode, routing::post};
1791
1792        let app = Router::new().route(
1793            "/api/v1/sessions",
1794            post(|| async {
1795                (
1796                    StatusCode::INTERNAL_SERVER_ERROR,
1797                    "{\"error\":\"failed to spawn session\"}",
1798                )
1799            }),
1800        );
1801        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1802        let addr = listener.local_addr().unwrap();
1803        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1804        let node = format!("127.0.0.1:{}", addr.port());
1805
1806        let cli = Cli {
1807            node,
1808            token: None,
1809            command: Commands::Spawn {
1810                workdir: Some("/tmp/repo".into()),
1811                name: None,
1812                provider: Some("claude".into()),
1813                auto: false,
1814                unrestricted: false,
1815                model: None,
1816                system_prompt: None,
1817                allowed_tools: None,
1818                ink: None,
1819                max_turns: None,
1820                max_budget: None,
1821                output_format: None,
1822                prompt: vec!["test".into()],
1823            },
1824        };
1825        let err = execute(&cli).await.unwrap_err();
1826        assert_eq!(err.to_string(), "failed to spawn session");
1827    }
1828
1829    #[tokio::test]
1830    async fn test_execute_interventions_error_response() {
1831        use axum::{Router, http::StatusCode, routing::get};
1832
1833        let app = Router::new().route(
1834            "/api/v1/sessions/{id}/interventions",
1835            get(|| async {
1836                (
1837                    StatusCode::NOT_FOUND,
1838                    "{\"error\":\"session not found: ghost\"}",
1839                )
1840            }),
1841        );
1842        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1843        let addr = listener.local_addr().unwrap();
1844        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1845        let node = format!("127.0.0.1:{}", addr.port());
1846
1847        let cli = Cli {
1848            node,
1849            token: None,
1850            command: Commands::Interventions {
1851                name: "ghost".into(),
1852            },
1853        };
1854        let err = execute(&cli).await.unwrap_err();
1855        assert_eq!(err.to_string(), "session not found: ghost");
1856    }
1857
1858    #[tokio::test]
1859    async fn test_execute_resume_success() {
1860        let node = start_test_server().await;
1861        let cli = Cli {
1862            node,
1863            token: None,
1864            command: Commands::Resume {
1865                name: "test-session".into(),
1866            },
1867        };
1868        let result = execute(&cli).await.unwrap();
1869        assert!(result.contains("Resumed session"));
1870        assert!(result.contains("repo"));
1871    }
1872
1873    #[tokio::test]
1874    async fn test_execute_input_success() {
1875        let node = start_test_server().await;
1876        let cli = Cli {
1877            node,
1878            token: None,
1879            command: Commands::Input {
1880                name: "test-session".into(),
1881                text: Some("yes".into()),
1882            },
1883        };
1884        let result = execute(&cli).await.unwrap();
1885        assert!(result.contains("Sent input to session test-session"));
1886    }
1887
1888    #[tokio::test]
1889    async fn test_execute_input_no_text() {
1890        let node = start_test_server().await;
1891        let cli = Cli {
1892            node,
1893            token: None,
1894            command: Commands::Input {
1895                name: "test-session".into(),
1896                text: None,
1897            },
1898        };
1899        let result = execute(&cli).await.unwrap();
1900        assert!(result.contains("Sent input to session test-session"));
1901    }
1902
1903    #[tokio::test]
1904    async fn test_execute_input_connection_refused() {
1905        let cli = Cli {
1906            node: "localhost:1".into(),
1907            token: None,
1908            command: Commands::Input {
1909                name: "test".into(),
1910                text: Some("y".into()),
1911            },
1912        };
1913        let result = execute(&cli).await;
1914        let err = result.unwrap_err().to_string();
1915        assert!(err.contains("Could not connect to pulpod"));
1916    }
1917
1918    #[tokio::test]
1919    async fn test_execute_input_error_response() {
1920        use axum::{Router, http::StatusCode, routing::post};
1921
1922        let app = Router::new().route(
1923            "/api/v1/sessions/{id}/input",
1924            post(|| async {
1925                (
1926                    StatusCode::NOT_FOUND,
1927                    "{\"error\":\"session not found: ghost\"}",
1928                )
1929            }),
1930        );
1931        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1932        let addr = listener.local_addr().unwrap();
1933        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1934        let node = format!("127.0.0.1:{}", addr.port());
1935
1936        let cli = Cli {
1937            node,
1938            token: None,
1939            command: Commands::Input {
1940                name: "ghost".into(),
1941                text: Some("y".into()),
1942            },
1943        };
1944        let err = execute(&cli).await.unwrap_err();
1945        assert_eq!(err.to_string(), "session not found: ghost");
1946    }
1947
1948    #[tokio::test]
1949    async fn test_execute_ui() {
1950        let cli = Cli {
1951            node: "localhost:7433".into(),
1952            token: None,
1953            command: Commands::Ui,
1954        };
1955        let result = execute(&cli).await.unwrap();
1956        assert!(result.contains("Opening"));
1957        assert!(result.contains("http://localhost:7433"));
1958    }
1959
1960    #[tokio::test]
1961    async fn test_execute_ui_custom_node() {
1962        let cli = Cli {
1963            node: "mac-mini:7433".into(),
1964            token: None,
1965            command: Commands::Ui,
1966        };
1967        let result = execute(&cli).await.unwrap();
1968        assert!(result.contains("http://mac-mini:7433"));
1969    }
1970
1971    #[test]
1972    fn test_format_sessions_empty() {
1973        assert_eq!(format_sessions(&[]), "No sessions.");
1974    }
1975
1976    #[test]
1977    fn test_format_sessions_with_data() {
1978        use chrono::Utc;
1979        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1980        use uuid::Uuid;
1981
1982        let sessions = vec![Session {
1983            id: Uuid::nil(),
1984            name: "my-api".into(),
1985            workdir: "/tmp/repo".into(),
1986            provider: Provider::Claude,
1987            prompt: "Fix the bug".into(),
1988            status: SessionStatus::Running,
1989            mode: SessionMode::Interactive,
1990            conversation_id: None,
1991            exit_code: None,
1992            backend_session_id: None,
1993            output_snapshot: None,
1994            guard_config: None,
1995            model: None,
1996            allowed_tools: None,
1997            system_prompt: None,
1998            metadata: None,
1999            ink: None,
2000            max_turns: None,
2001            max_budget_usd: None,
2002            output_format: None,
2003            intervention_reason: None,
2004            intervention_at: None,
2005            last_output_at: None,
2006            idle_since: None,
2007            waiting_for_input: false,
2008            created_at: Utc::now(),
2009            updated_at: Utc::now(),
2010        }];
2011        let output = format_sessions(&sessions);
2012        assert!(output.contains("NAME"));
2013        assert!(output.contains("my-api"));
2014        assert!(output.contains("running"));
2015        assert!(output.contains("claude"));
2016        assert!(output.contains("Fix the bug"));
2017    }
2018
2019    #[test]
2020    fn test_format_sessions_long_prompt_truncated() {
2021        use chrono::Utc;
2022        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2023        use uuid::Uuid;
2024
2025        let sessions = vec![Session {
2026            id: Uuid::nil(),
2027            name: "test".into(),
2028            workdir: "/tmp".into(),
2029            provider: Provider::Codex,
2030            prompt: "A very long prompt that exceeds forty characters in total length".into(),
2031            status: SessionStatus::Completed,
2032            mode: SessionMode::Autonomous,
2033            conversation_id: None,
2034            exit_code: None,
2035            backend_session_id: None,
2036            output_snapshot: None,
2037            guard_config: None,
2038            model: None,
2039            allowed_tools: None,
2040            system_prompt: None,
2041            metadata: None,
2042            ink: None,
2043            max_turns: None,
2044            max_budget_usd: None,
2045            output_format: None,
2046            intervention_reason: None,
2047            intervention_at: None,
2048            last_output_at: None,
2049            idle_since: None,
2050            waiting_for_input: false,
2051            created_at: Utc::now(),
2052            updated_at: Utc::now(),
2053        }];
2054        let output = format_sessions(&sessions);
2055        assert!(output.contains("..."));
2056    }
2057
2058    #[test]
2059    fn test_format_sessions_waiting_for_input() {
2060        use chrono::Utc;
2061        use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2062        use uuid::Uuid;
2063
2064        let sessions = vec![Session {
2065            id: Uuid::nil(),
2066            name: "blocked".into(),
2067            workdir: "/tmp".into(),
2068            provider: Provider::Claude,
2069            prompt: "Fix bug".into(),
2070            status: SessionStatus::Running,
2071            mode: SessionMode::Interactive,
2072            conversation_id: None,
2073            exit_code: None,
2074            backend_session_id: None,
2075            output_snapshot: None,
2076            guard_config: None,
2077            model: None,
2078            allowed_tools: None,
2079            system_prompt: None,
2080            metadata: None,
2081            ink: None,
2082            max_turns: None,
2083            max_budget_usd: None,
2084            output_format: None,
2085            intervention_reason: None,
2086            intervention_at: None,
2087            last_output_at: None,
2088            idle_since: None,
2089            waiting_for_input: true,
2090            created_at: Utc::now(),
2091            updated_at: Utc::now(),
2092        }];
2093        let output = format_sessions(&sessions);
2094        assert!(output.contains("waiting"));
2095        assert!(!output.contains("running"));
2096    }
2097
2098    #[test]
2099    fn test_format_nodes() {
2100        use pulpo_common::node::NodeInfo;
2101        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2102
2103        let resp = PeersResponse {
2104            local: NodeInfo {
2105                name: "mac-mini".into(),
2106                hostname: "h".into(),
2107                os: "macos".into(),
2108                arch: "arm64".into(),
2109                cpus: 8,
2110                memory_mb: 16384,
2111                gpu: None,
2112            },
2113            peers: vec![PeerInfo {
2114                name: "win-pc".into(),
2115                address: "win-pc:7433".into(),
2116                status: PeerStatus::Online,
2117                node_info: None,
2118                session_count: Some(3),
2119                source: PeerSource::Configured,
2120            }],
2121        };
2122        let output = format_nodes(&resp);
2123        assert!(output.contains("mac-mini"));
2124        assert!(output.contains("(local)"));
2125        assert!(output.contains("win-pc"));
2126        assert!(output.contains('3'));
2127    }
2128
2129    #[test]
2130    fn test_format_nodes_no_session_count() {
2131        use pulpo_common::node::NodeInfo;
2132        use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2133
2134        let resp = PeersResponse {
2135            local: NodeInfo {
2136                name: "local".into(),
2137                hostname: "h".into(),
2138                os: "linux".into(),
2139                arch: "x86_64".into(),
2140                cpus: 4,
2141                memory_mb: 8192,
2142                gpu: None,
2143            },
2144            peers: vec![PeerInfo {
2145                name: "peer".into(),
2146                address: "peer:7433".into(),
2147                status: PeerStatus::Offline,
2148                node_info: None,
2149                session_count: None,
2150                source: PeerSource::Configured,
2151            }],
2152        };
2153        let output = format_nodes(&resp);
2154        assert!(output.contains("offline"));
2155        // No session count → shows "-"
2156        let lines: Vec<&str> = output.lines().collect();
2157        assert!(lines[2].contains('-'));
2158    }
2159
2160    #[tokio::test]
2161    async fn test_execute_resume_connection_refused() {
2162        let cli = Cli {
2163            node: "localhost:1".into(),
2164            token: None,
2165            command: Commands::Resume {
2166                name: "test".into(),
2167            },
2168        };
2169        let result = execute(&cli).await;
2170        let err = result.unwrap_err().to_string();
2171        assert!(err.contains("Could not connect to pulpod"));
2172    }
2173
2174    #[tokio::test]
2175    async fn test_execute_spawn_connection_refused() {
2176        let cli = Cli {
2177            node: "localhost:1".into(),
2178            token: None,
2179            command: Commands::Spawn {
2180                workdir: Some("/tmp".into()),
2181                name: None,
2182                provider: Some("claude".into()),
2183                auto: false,
2184                unrestricted: false,
2185                model: None,
2186                system_prompt: None,
2187                allowed_tools: None,
2188                ink: None,
2189                max_turns: None,
2190                max_budget: None,
2191                output_format: None,
2192                prompt: vec!["test".into()],
2193            },
2194        };
2195        let result = execute(&cli).await;
2196        let err = result.unwrap_err().to_string();
2197        assert!(err.contains("Could not connect to pulpod"));
2198    }
2199
2200    #[tokio::test]
2201    async fn test_execute_kill_connection_refused() {
2202        let cli = Cli {
2203            node: "localhost:1".into(),
2204            token: None,
2205            command: Commands::Kill {
2206                name: "test".into(),
2207            },
2208        };
2209        let result = execute(&cli).await;
2210        let err = result.unwrap_err().to_string();
2211        assert!(err.contains("Could not connect to pulpod"));
2212    }
2213
2214    #[tokio::test]
2215    async fn test_execute_delete_connection_refused() {
2216        let cli = Cli {
2217            node: "localhost:1".into(),
2218            token: None,
2219            command: Commands::Delete {
2220                name: "test".into(),
2221            },
2222        };
2223        let result = execute(&cli).await;
2224        let err = result.unwrap_err().to_string();
2225        assert!(err.contains("Could not connect to pulpod"));
2226    }
2227
2228    #[tokio::test]
2229    async fn test_execute_logs_connection_refused() {
2230        let cli = Cli {
2231            node: "localhost:1".into(),
2232            token: None,
2233            command: Commands::Logs {
2234                name: "test".into(),
2235                lines: 50,
2236                follow: false,
2237            },
2238        };
2239        let result = execute(&cli).await;
2240        let err = result.unwrap_err().to_string();
2241        assert!(err.contains("Could not connect to pulpod"));
2242    }
2243
2244    #[tokio::test]
2245    async fn test_friendly_error_connect() {
2246        // Make a request to a closed port to get a connect error
2247        let err = reqwest::Client::new()
2248            .get("http://127.0.0.1:1")
2249            .send()
2250            .await
2251            .unwrap_err();
2252        let friendly = friendly_error(&err, "test-node:1");
2253        let msg = friendly.to_string();
2254        assert!(
2255            msg.contains("Could not connect"),
2256            "Expected connect message, got: {msg}"
2257        );
2258    }
2259
2260    #[tokio::test]
2261    async fn test_friendly_error_other() {
2262        // A request to an invalid URL creates a builder error, not a connect error
2263        let err = reqwest::Client::new()
2264            .get("http://[::invalid::url")
2265            .send()
2266            .await
2267            .unwrap_err();
2268        let friendly = friendly_error(&err, "bad-host");
2269        let msg = friendly.to_string();
2270        assert!(
2271            msg.contains("Network error"),
2272            "Expected network error message, got: {msg}"
2273        );
2274        assert!(msg.contains("bad-host"));
2275    }
2276
2277    // -- Auth helper tests --
2278
2279    #[test]
2280    fn test_is_localhost_variants() {
2281        assert!(is_localhost("localhost:7433"));
2282        assert!(is_localhost("127.0.0.1:7433"));
2283        assert!(is_localhost("[::1]:7433"));
2284        assert!(is_localhost("::1"));
2285        assert!(is_localhost("localhost"));
2286        assert!(!is_localhost("mac-mini:7433"));
2287        assert!(!is_localhost("192.168.1.100:7433"));
2288    }
2289
2290    #[test]
2291    fn test_authed_get_with_token() {
2292        let client = reqwest::Client::new();
2293        let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2294            .build()
2295            .unwrap();
2296        let auth = req
2297            .headers()
2298            .get("authorization")
2299            .unwrap()
2300            .to_str()
2301            .unwrap();
2302        assert_eq!(auth, "Bearer tok");
2303    }
2304
2305    #[test]
2306    fn test_authed_get_without_token() {
2307        let client = reqwest::Client::new();
2308        let req = authed_get(&client, "http://h:1/api".into(), None)
2309            .build()
2310            .unwrap();
2311        assert!(req.headers().get("authorization").is_none());
2312    }
2313
2314    #[test]
2315    fn test_authed_post_with_token() {
2316        let client = reqwest::Client::new();
2317        let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2318            .build()
2319            .unwrap();
2320        let auth = req
2321            .headers()
2322            .get("authorization")
2323            .unwrap()
2324            .to_str()
2325            .unwrap();
2326        assert_eq!(auth, "Bearer secret");
2327    }
2328
2329    #[test]
2330    fn test_authed_post_without_token() {
2331        let client = reqwest::Client::new();
2332        let req = authed_post(&client, "http://h:1/api".into(), None)
2333            .build()
2334            .unwrap();
2335        assert!(req.headers().get("authorization").is_none());
2336    }
2337
2338    #[test]
2339    fn test_authed_delete_with_token() {
2340        let client = reqwest::Client::new();
2341        let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2342            .build()
2343            .unwrap();
2344        let auth = req
2345            .headers()
2346            .get("authorization")
2347            .unwrap()
2348            .to_str()
2349            .unwrap();
2350        assert_eq!(auth, "Bearer del-tok");
2351    }
2352
2353    #[test]
2354    fn test_authed_delete_without_token() {
2355        let client = reqwest::Client::new();
2356        let req = authed_delete(&client, "http://h:1/api".into(), None)
2357            .build()
2358            .unwrap();
2359        assert!(req.headers().get("authorization").is_none());
2360    }
2361
2362    #[tokio::test]
2363    async fn test_resolve_token_explicit() {
2364        let client = reqwest::Client::new();
2365        let token =
2366            resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2367        assert_eq!(token, Some("my-tok".into()));
2368    }
2369
2370    #[tokio::test]
2371    async fn test_resolve_token_remote_no_explicit() {
2372        let client = reqwest::Client::new();
2373        let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2374        assert_eq!(token, None);
2375    }
2376
2377    #[tokio::test]
2378    async fn test_resolve_token_localhost_auto_discover() {
2379        use axum::{Json, Router, routing::get};
2380
2381        let app = Router::new().route(
2382            "/api/v1/auth/token",
2383            get(|| async {
2384                Json(AuthTokenResponse {
2385                    token: "discovered".into(),
2386                })
2387            }),
2388        );
2389        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2390        let addr = listener.local_addr().unwrap();
2391        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2392
2393        let node = format!("localhost:{}", addr.port());
2394        let base = base_url(&node);
2395        let client = reqwest::Client::new();
2396        let token = resolve_token(&client, &base, &node, None).await;
2397        assert_eq!(token, Some("discovered".into()));
2398    }
2399
2400    #[tokio::test]
2401    async fn test_discover_token_empty_returns_none() {
2402        use axum::{Json, Router, routing::get};
2403
2404        let app = Router::new().route(
2405            "/api/v1/auth/token",
2406            get(|| async {
2407                Json(AuthTokenResponse {
2408                    token: String::new(),
2409                })
2410            }),
2411        );
2412        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2413        let addr = listener.local_addr().unwrap();
2414        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2415
2416        let base = format!("http://127.0.0.1:{}", addr.port());
2417        let client = reqwest::Client::new();
2418        assert_eq!(discover_token(&client, &base).await, None);
2419    }
2420
2421    #[tokio::test]
2422    async fn test_discover_token_unreachable_returns_none() {
2423        let client = reqwest::Client::new();
2424        assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2425    }
2426
2427    #[test]
2428    fn test_cli_parse_with_token() {
2429        let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2430        assert_eq!(cli.token, Some("my-secret".into()));
2431    }
2432
2433    #[test]
2434    fn test_cli_parse_without_token() {
2435        let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2436        assert_eq!(cli.token, None);
2437    }
2438
2439    #[tokio::test]
2440    async fn test_execute_with_explicit_token_sends_header() {
2441        use axum::{Router, extract::Request, http::StatusCode, routing::get};
2442
2443        let app = Router::new().route(
2444            "/api/v1/sessions",
2445            get(|req: Request| async move {
2446                let auth = req
2447                    .headers()
2448                    .get("authorization")
2449                    .and_then(|v| v.to_str().ok())
2450                    .unwrap_or("");
2451                assert_eq!(auth, "Bearer test-token");
2452                (StatusCode::OK, "[]".to_owned())
2453            }),
2454        );
2455        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2456        let addr = listener.local_addr().unwrap();
2457        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2458        let node = format!("127.0.0.1:{}", addr.port());
2459
2460        let cli = Cli {
2461            node,
2462            token: Some("test-token".into()),
2463            command: Commands::List,
2464        };
2465        let result = execute(&cli).await.unwrap();
2466        assert_eq!(result, "No sessions.");
2467    }
2468
2469    // -- Interventions tests --
2470
2471    #[test]
2472    fn test_cli_parse_interventions() {
2473        let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2474        assert!(matches!(
2475            &cli.command,
2476            Commands::Interventions { name } if name == "my-session"
2477        ));
2478    }
2479
2480    #[test]
2481    fn test_format_interventions_empty() {
2482        assert_eq!(format_interventions(&[]), "No intervention events.");
2483    }
2484
2485    #[test]
2486    fn test_format_interventions_with_data() {
2487        let events = vec![
2488            InterventionEventResponse {
2489                id: 1,
2490                session_id: "sess-1".into(),
2491                reason: "Memory exceeded threshold".into(),
2492                created_at: "2026-01-01T00:00:00Z".into(),
2493            },
2494            InterventionEventResponse {
2495                id: 2,
2496                session_id: "sess-1".into(),
2497                reason: "Idle for 10 minutes".into(),
2498                created_at: "2026-01-02T00:00:00Z".into(),
2499            },
2500        ];
2501        let output = format_interventions(&events);
2502        assert!(output.contains("ID"));
2503        assert!(output.contains("TIMESTAMP"));
2504        assert!(output.contains("REASON"));
2505        assert!(output.contains("Memory exceeded threshold"));
2506        assert!(output.contains("Idle for 10 minutes"));
2507        assert!(output.contains("2026-01-01T00:00:00Z"));
2508    }
2509
2510    #[tokio::test]
2511    async fn test_execute_interventions_empty() {
2512        let node = start_test_server().await;
2513        let cli = Cli {
2514            node,
2515            token: None,
2516            command: Commands::Interventions {
2517                name: "my-session".into(),
2518            },
2519        };
2520        let result = execute(&cli).await.unwrap();
2521        assert_eq!(result, "No intervention events.");
2522    }
2523
2524    #[tokio::test]
2525    async fn test_execute_interventions_with_data() {
2526        use axum::{Router, routing::get};
2527
2528        let app = Router::new().route(
2529            "/api/v1/sessions/{id}/interventions",
2530            get(|| async {
2531                r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2532                    .to_owned()
2533            }),
2534        );
2535        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2536        let addr = listener.local_addr().unwrap();
2537        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2538        let node = format!("127.0.0.1:{}", addr.port());
2539
2540        let cli = Cli {
2541            node,
2542            token: None,
2543            command: Commands::Interventions {
2544                name: "test".into(),
2545            },
2546        };
2547        let result = execute(&cli).await.unwrap();
2548        assert!(result.contains("OOM"));
2549        assert!(result.contains("2026-01-01T00:00:00Z"));
2550    }
2551
2552    #[tokio::test]
2553    async fn test_execute_interventions_connection_refused() {
2554        let cli = Cli {
2555            node: "localhost:1".into(),
2556            token: None,
2557            command: Commands::Interventions {
2558                name: "test".into(),
2559            },
2560        };
2561        let result = execute(&cli).await;
2562        let err = result.unwrap_err().to_string();
2563        assert!(err.contains("Could not connect to pulpod"));
2564    }
2565
2566    // -- Attach command tests --
2567
2568    #[test]
2569    fn test_build_attach_command() {
2570        let cmd = build_attach_command("pulpo-my-session");
2571        assert_eq!(cmd.get_program(), "tmux");
2572        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2573        assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2574    }
2575
2576    #[test]
2577    fn test_cli_parse_attach() {
2578        let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2579        assert!(matches!(
2580            &cli.command,
2581            Commands::Attach { name } if name == "my-session"
2582        ));
2583    }
2584
2585    #[test]
2586    fn test_cli_parse_attach_alias() {
2587        let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2588        assert!(matches!(
2589            &cli.command,
2590            Commands::Attach { name } if name == "my-session"
2591        ));
2592    }
2593
2594    #[tokio::test]
2595    async fn test_execute_attach_success() {
2596        let node = start_test_server().await;
2597        let cli = Cli {
2598            node,
2599            token: None,
2600            command: Commands::Attach {
2601                name: "test-session".into(),
2602            },
2603        };
2604        let result = execute(&cli).await.unwrap();
2605        assert!(result.contains("Detached from session test-session"));
2606    }
2607
2608    #[tokio::test]
2609    async fn test_execute_attach_with_backend_session_id() {
2610        use axum::{Router, routing::get};
2611        let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2612        let app = Router::new().route(
2613            "/api/v1/sessions/{id}",
2614            get(move || async move { session_json.to_owned() }),
2615        );
2616        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2617        let addr = listener.local_addr().unwrap();
2618        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2619
2620        let cli = Cli {
2621            node: format!("127.0.0.1:{}", addr.port()),
2622            token: None,
2623            command: Commands::Attach {
2624                name: "my-session".into(),
2625            },
2626        };
2627        let result = execute(&cli).await.unwrap();
2628        assert!(result.contains("Detached from session my-session"));
2629    }
2630
2631    #[tokio::test]
2632    async fn test_execute_attach_connection_refused() {
2633        let cli = Cli {
2634            node: "localhost:1".into(),
2635            token: None,
2636            command: Commands::Attach {
2637                name: "test-session".into(),
2638            },
2639        };
2640        let result = execute(&cli).await;
2641        let err = result.unwrap_err().to_string();
2642        assert!(err.contains("Could not connect to pulpod"));
2643    }
2644
2645    #[tokio::test]
2646    async fn test_execute_attach_error_response() {
2647        use axum::{Router, http::StatusCode, routing::get};
2648        let app = Router::new().route(
2649            "/api/v1/sessions/{id}",
2650            get(|| async {
2651                (
2652                    StatusCode::NOT_FOUND,
2653                    r#"{"error":"session not found"}"#.to_owned(),
2654                )
2655            }),
2656        );
2657        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2658        let addr = listener.local_addr().unwrap();
2659        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2660
2661        let cli = Cli {
2662            node: format!("127.0.0.1:{}", addr.port()),
2663            token: None,
2664            command: Commands::Attach {
2665                name: "nonexistent".into(),
2666            },
2667        };
2668        let result = execute(&cli).await;
2669        let err = result.unwrap_err().to_string();
2670        assert!(err.contains("session not found"));
2671    }
2672
2673    // -- Alias parse tests --
2674
2675    #[test]
2676    fn test_cli_parse_alias_spawn() {
2677        let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2678        assert!(matches!(&cli.command, Commands::Spawn { .. }));
2679    }
2680
2681    #[test]
2682    fn test_cli_parse_alias_list() {
2683        let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2684        assert!(matches!(cli.command, Commands::List));
2685    }
2686
2687    #[test]
2688    fn test_cli_parse_alias_logs() {
2689        let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2690        assert!(matches!(
2691            &cli.command,
2692            Commands::Logs { name, .. } if name == "my-session"
2693        ));
2694    }
2695
2696    #[test]
2697    fn test_cli_parse_alias_kill() {
2698        let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2699        assert!(matches!(
2700            &cli.command,
2701            Commands::Kill { name } if name == "my-session"
2702        ));
2703    }
2704
2705    #[test]
2706    fn test_cli_parse_alias_delete() {
2707        let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2708        assert!(matches!(
2709            &cli.command,
2710            Commands::Delete { name } if name == "my-session"
2711        ));
2712    }
2713
2714    #[test]
2715    fn test_cli_parse_alias_resume() {
2716        let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2717        assert!(matches!(
2718            &cli.command,
2719            Commands::Resume { name } if name == "my-session"
2720        ));
2721    }
2722
2723    #[test]
2724    fn test_cli_parse_alias_nodes() {
2725        let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2726        assert!(matches!(cli.command, Commands::Nodes));
2727    }
2728
2729    #[test]
2730    fn test_cli_parse_alias_interventions() {
2731        let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2732        assert!(matches!(
2733            &cli.command,
2734            Commands::Interventions { name } if name == "my-session"
2735        ));
2736    }
2737
2738    #[test]
2739    fn test_api_error_json() {
2740        let err = api_error("{\"error\":\"session not found: foo\"}");
2741        assert_eq!(err.to_string(), "session not found: foo");
2742    }
2743
2744    #[test]
2745    fn test_api_error_plain_text() {
2746        let err = api_error("plain text error");
2747        assert_eq!(err.to_string(), "plain text error");
2748    }
2749
2750    // -- diff_output tests --
2751
2752    #[test]
2753    fn test_diff_output_empty_prev() {
2754        assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2755    }
2756
2757    #[test]
2758    fn test_diff_output_identical() {
2759        assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2760    }
2761
2762    #[test]
2763    fn test_diff_output_new_lines_appended() {
2764        let prev = "line1\nline2";
2765        let new = "line1\nline2\nline3\nline4";
2766        assert_eq!(diff_output(prev, new), "line3\nline4");
2767    }
2768
2769    #[test]
2770    fn test_diff_output_scrolled_window() {
2771        // Window of 3 lines: old lines scroll off top, new appear at bottom
2772        let prev = "line1\nline2\nline3";
2773        let new = "line2\nline3\nline4";
2774        assert_eq!(diff_output(prev, new), "line4");
2775    }
2776
2777    #[test]
2778    fn test_diff_output_completely_different() {
2779        let prev = "aaa\nbbb";
2780        let new = "xxx\nyyy";
2781        assert_eq!(diff_output(prev, new), "xxx\nyyy");
2782    }
2783
2784    #[test]
2785    fn test_diff_output_last_line_matches_but_overlap_fails() {
2786        // Last line of prev appears in new but preceding lines don't match
2787        let prev = "aaa\ncommon";
2788        let new = "zzz\ncommon\nnew_line";
2789        // "common" matches at index 1 of new, overlap_len = min(2, 2) = 2
2790        // prev_tail = ["aaa", "common"], new_overlap = ["zzz", "common"] — mismatch
2791        // Falls through, no verified overlap, so returns everything
2792        assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2793    }
2794
2795    #[test]
2796    fn test_diff_output_new_empty() {
2797        assert_eq!(diff_output("line1", ""), "");
2798    }
2799
2800    // -- follow_logs tests --
2801
2802    /// Start a test server that simulates evolving output and session status transitions.
2803    async fn start_follow_test_server() -> String {
2804        use axum::{Router, extract::Path, extract::Query, routing::get};
2805        use std::sync::Arc;
2806        use std::sync::atomic::{AtomicUsize, Ordering};
2807
2808        let call_count = Arc::new(AtomicUsize::new(0));
2809        let output_count = call_count.clone();
2810        let status_count = Arc::new(AtomicUsize::new(0));
2811        let status_count_inner = status_count.clone();
2812
2813        let app = Router::new()
2814            .route(
2815                "/api/v1/sessions/{id}/output",
2816                get(
2817                    move |_path: Path<String>,
2818                          _query: Query<std::collections::HashMap<String, String>>| {
2819                        let count = output_count.clone();
2820                        async move {
2821                            let n = count.fetch_add(1, Ordering::SeqCst);
2822                            let output = match n {
2823                                0 => "line1\nline2".to_owned(),
2824                                1 => "line1\nline2\nline3".to_owned(),
2825                                _ => "line2\nline3\nline4".to_owned(),
2826                            };
2827                            format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2828                        }
2829                    },
2830                ),
2831            )
2832            .route(
2833                "/api/v1/sessions/{id}",
2834                get(move |_path: Path<String>| {
2835                    let count = status_count_inner.clone();
2836                    async move {
2837                        let n = count.fetch_add(1, Ordering::SeqCst);
2838                        let status = if n < 2 { "running" } else { "completed" };
2839                        format!(
2840                            r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2841                        )
2842                    }
2843                }),
2844            );
2845
2846        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2847        let addr = listener.local_addr().unwrap();
2848        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2849        format!("http://127.0.0.1:{}", addr.port())
2850    }
2851
2852    #[tokio::test]
2853    async fn test_follow_logs_polls_and_exits_on_completed() {
2854        let base = start_follow_test_server().await;
2855        let client = reqwest::Client::new();
2856        let mut buf = Vec::new();
2857
2858        follow_logs(&client, &base, "test", 100, None, &mut buf)
2859            .await
2860            .unwrap();
2861
2862        let output = String::from_utf8(buf).unwrap();
2863        // Should contain initial output + new lines
2864        assert!(output.contains("line1"));
2865        assert!(output.contains("line2"));
2866        assert!(output.contains("line3"));
2867        assert!(output.contains("line4"));
2868    }
2869
2870    #[tokio::test]
2871    async fn test_execute_logs_follow_success() {
2872        let base = start_follow_test_server().await;
2873        // Extract host:port from http://127.0.0.1:PORT
2874        let node = base.strip_prefix("http://").unwrap().to_owned();
2875
2876        let cli = Cli {
2877            node,
2878            token: None,
2879            command: Commands::Logs {
2880                name: "test".into(),
2881                lines: 100,
2882                follow: true,
2883            },
2884        };
2885        // execute() with follow writes to stdout and returns empty string
2886        let result = execute(&cli).await.unwrap();
2887        assert_eq!(result, "");
2888    }
2889
2890    #[tokio::test]
2891    async fn test_execute_logs_follow_connection_refused() {
2892        let cli = Cli {
2893            node: "localhost:1".into(),
2894            token: None,
2895            command: Commands::Logs {
2896                name: "test".into(),
2897                lines: 50,
2898                follow: true,
2899            },
2900        };
2901        let result = execute(&cli).await;
2902        let err = result.unwrap_err().to_string();
2903        assert!(
2904            err.contains("Could not connect to pulpod"),
2905            "Expected friendly error, got: {err}"
2906        );
2907    }
2908
2909    #[tokio::test]
2910    async fn test_follow_logs_exits_on_dead() {
2911        use axum::{Router, extract::Path, extract::Query, routing::get};
2912
2913        let app = Router::new()
2914            .route(
2915                "/api/v1/sessions/{id}/output",
2916                get(
2917                    |_path: Path<String>,
2918                     _query: Query<std::collections::HashMap<String, String>>| async {
2919                        r#"{"output":"some output"}"#.to_owned()
2920                    },
2921                ),
2922            )
2923            .route(
2924                "/api/v1/sessions/{id}",
2925                get(|_path: Path<String>| async {
2926                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2927                }),
2928            );
2929
2930        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2931        let addr = listener.local_addr().unwrap();
2932        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2933        let base = format!("http://127.0.0.1:{}", addr.port());
2934
2935        let client = reqwest::Client::new();
2936        let mut buf = Vec::new();
2937        follow_logs(&client, &base, "test", 100, None, &mut buf)
2938            .await
2939            .unwrap();
2940
2941        let output = String::from_utf8(buf).unwrap();
2942        assert!(output.contains("some output"));
2943    }
2944
2945    #[tokio::test]
2946    async fn test_follow_logs_exits_on_stale() {
2947        use axum::{Router, extract::Path, extract::Query, routing::get};
2948
2949        let app = Router::new()
2950            .route(
2951                "/api/v1/sessions/{id}/output",
2952                get(
2953                    |_path: Path<String>,
2954                     _query: Query<std::collections::HashMap<String, String>>| async {
2955                        r#"{"output":"stale output"}"#.to_owned()
2956                    },
2957                ),
2958            )
2959            .route(
2960                "/api/v1/sessions/{id}",
2961                get(|_path: Path<String>| async {
2962                    r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2963                }),
2964            );
2965
2966        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2967        let addr = listener.local_addr().unwrap();
2968        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2969        let base = format!("http://127.0.0.1:{}", addr.port());
2970
2971        let client = reqwest::Client::new();
2972        let mut buf = Vec::new();
2973        follow_logs(&client, &base, "test", 100, None, &mut buf)
2974            .await
2975            .unwrap();
2976
2977        let output = String::from_utf8(buf).unwrap();
2978        assert!(output.contains("stale output"));
2979    }
2980
2981    #[tokio::test]
2982    async fn test_execute_logs_follow_non_reqwest_error() {
2983        use axum::{Router, extract::Path, extract::Query, routing::get};
2984
2985        // Session status endpoint returns invalid JSON to trigger a serde error
2986        let app = Router::new()
2987            .route(
2988                "/api/v1/sessions/{id}/output",
2989                get(
2990                    |_path: Path<String>,
2991                     _query: Query<std::collections::HashMap<String, String>>| async {
2992                        r#"{"output":"initial"}"#.to_owned()
2993                    },
2994                ),
2995            )
2996            .route(
2997                "/api/v1/sessions/{id}",
2998                get(|_path: Path<String>| async { "not valid json".to_owned() }),
2999            );
3000
3001        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3002        let addr = listener.local_addr().unwrap();
3003        tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3004        let node = format!("127.0.0.1:{}", addr.port());
3005
3006        let cli = Cli {
3007            node,
3008            token: None,
3009            command: Commands::Logs {
3010                name: "test".into(),
3011                lines: 100,
3012                follow: true,
3013            },
3014        };
3015        let err = execute(&cli).await.unwrap_err();
3016        // serde_json error, not a reqwest error — hits the Err(other) branch
3017        let msg = err.to_string();
3018        assert!(
3019            msg.contains("expected ident"),
3020            "Expected serde parse error, got: {msg}"
3021        );
3022    }
3023
3024    #[test]
3025    fn test_cli_parse_spawn_with_guardrails() {
3026        let cli = Cli::try_parse_from([
3027            "pulpo",
3028            "spawn",
3029            "--workdir",
3030            "/tmp",
3031            "--max-turns",
3032            "10",
3033            "--max-budget",
3034            "5.5",
3035            "--output-format",
3036            "json",
3037            "Do it",
3038        ])
3039        .unwrap();
3040        assert!(matches!(
3041            &cli.command,
3042            Commands::Spawn { max_turns, max_budget, output_format, .. }
3043                if *max_turns == Some(10) && *max_budget == Some(5.5)
3044                && output_format.as_deref() == Some("json")
3045        ));
3046    }
3047
3048    #[tokio::test]
3049    async fn test_fetch_session_status_connection_error() {
3050        let client = reqwest::Client::new();
3051        let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3052        assert!(result.is_err());
3053    }
3054
3055    // -- Crontab wrapper tests --
3056
3057    #[test]
3058    fn test_build_crontab_line() {
3059        let line = build_crontab_line(
3060            "nightly-review",
3061            "0 3 * * *",
3062            "/home/me/repo",
3063            "claude",
3064            "Review PRs",
3065            "localhost:7433",
3066        );
3067        assert_eq!(
3068            line,
3069            "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3070        );
3071    }
3072
3073    #[test]
3074    fn test_crontab_install_success() {
3075        let crontab = "# existing cron\n0 * * * * echo hi\n";
3076        let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3077        let result = crontab_install(crontab, "my-job", line).unwrap();
3078        assert!(result.starts_with("# existing cron\n"));
3079        assert!(result.ends_with("#pulpo:my-job\n"));
3080        assert!(result.contains("echo hi"));
3081    }
3082
3083    #[test]
3084    fn test_crontab_install_duplicate_error() {
3085        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3086        let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3087        let err = crontab_install(crontab, "my-job", line).unwrap_err();
3088        assert!(err.to_string().contains("already exists"));
3089    }
3090
3091    #[test]
3092    fn test_crontab_list_empty() {
3093        assert_eq!(crontab_list(""), "No pulpo schedules.");
3094    }
3095
3096    #[test]
3097    fn test_crontab_list_no_pulpo_entries() {
3098        assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3099    }
3100
3101    #[test]
3102    fn test_crontab_list_with_entries() {
3103        let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3104        let output = crontab_list(crontab);
3105        assert!(output.contains("NAME"));
3106        assert!(output.contains("CRON"));
3107        assert!(output.contains("PAUSED"));
3108        assert!(output.contains("nightly"));
3109        assert!(output.contains("0 3 * * *"));
3110        assert!(output.contains("no"));
3111    }
3112
3113    #[test]
3114    fn test_crontab_list_paused_entry() {
3115        let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3116        let output = crontab_list(crontab);
3117        assert!(output.contains("paused-job"));
3118        assert!(output.contains("yes"));
3119    }
3120
3121    #[test]
3122    fn test_crontab_list_short_line() {
3123        // A line with fewer than 5 space-separated parts but still tagged
3124        let crontab = "badcron #pulpo:broken\n";
3125        let output = crontab_list(crontab);
3126        assert!(output.contains("broken"));
3127        assert!(output.contains('?'));
3128    }
3129
3130    #[test]
3131    fn test_crontab_remove_success() {
3132        let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3133        let result = crontab_remove(crontab, "my-job").unwrap();
3134        assert!(result.contains("echo hi"));
3135        assert!(!result.contains("my-job"));
3136    }
3137
3138    #[test]
3139    fn test_crontab_remove_not_found() {
3140        let crontab = "0 * * * * echo hi\n";
3141        let err = crontab_remove(crontab, "ghost").unwrap_err();
3142        assert!(err.to_string().contains("not found"));
3143    }
3144
3145    #[test]
3146    fn test_crontab_pause_success() {
3147        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3148        let result = crontab_pause(crontab, "my-job").unwrap();
3149        assert!(result.starts_with('#'));
3150        assert!(result.contains("#pulpo:my-job"));
3151    }
3152
3153    #[test]
3154    fn test_crontab_pause_not_found() {
3155        let crontab = "0 * * * * echo hi\n";
3156        let err = crontab_pause(crontab, "ghost").unwrap_err();
3157        assert!(err.to_string().contains("not found or already paused"));
3158    }
3159
3160    #[test]
3161    fn test_crontab_pause_already_paused() {
3162        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3163        let err = crontab_pause(crontab, "my-job").unwrap_err();
3164        assert!(err.to_string().contains("already paused"));
3165    }
3166
3167    #[test]
3168    fn test_crontab_resume_success() {
3169        let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3170        let result = crontab_resume(crontab, "my-job").unwrap();
3171        assert!(!result.starts_with('#'));
3172        assert!(result.contains("#pulpo:my-job"));
3173    }
3174
3175    #[test]
3176    fn test_crontab_resume_not_found() {
3177        let crontab = "0 * * * * echo hi\n";
3178        let err = crontab_resume(crontab, "ghost").unwrap_err();
3179        assert!(err.to_string().contains("not found or not paused"));
3180    }
3181
3182    #[test]
3183    fn test_crontab_resume_not_paused() {
3184        let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3185        let err = crontab_resume(crontab, "my-job").unwrap_err();
3186        assert!(err.to_string().contains("not paused"));
3187    }
3188
3189    // -- Schedule CLI parse tests --
3190
3191    #[test]
3192    fn test_cli_parse_schedule_install() {
3193        let cli = Cli::try_parse_from([
3194            "pulpo",
3195            "schedule",
3196            "install",
3197            "nightly",
3198            "0 3 * * *",
3199            "--workdir",
3200            "/tmp/repo",
3201            "Review",
3202            "PRs",
3203        ])
3204        .unwrap();
3205        assert!(matches!(
3206            &cli.command,
3207            Commands::Schedule {
3208                action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3209            } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3210              && provider == "claude" && prompt == &["Review", "PRs"]
3211        ));
3212    }
3213
3214    #[test]
3215    fn test_cli_parse_schedule_list() {
3216        let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3217        assert!(matches!(
3218            &cli.command,
3219            Commands::Schedule {
3220                action: ScheduleAction::List
3221            }
3222        ));
3223    }
3224
3225    #[test]
3226    fn test_cli_parse_schedule_remove() {
3227        let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3228        assert!(matches!(
3229            &cli.command,
3230            Commands::Schedule {
3231                action: ScheduleAction::Remove { name }
3232            } if name == "nightly"
3233        ));
3234    }
3235
3236    #[test]
3237    fn test_cli_parse_schedule_pause() {
3238        let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3239        assert!(matches!(
3240            &cli.command,
3241            Commands::Schedule {
3242                action: ScheduleAction::Pause { name }
3243            } if name == "nightly"
3244        ));
3245    }
3246
3247    #[test]
3248    fn test_cli_parse_schedule_resume() {
3249        let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3250        assert!(matches!(
3251            &cli.command,
3252            Commands::Schedule {
3253                action: ScheduleAction::Resume { name }
3254            } if name == "nightly"
3255        ));
3256    }
3257
3258    #[test]
3259    fn test_cli_parse_schedule_alias() {
3260        let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3261        assert!(matches!(
3262            &cli.command,
3263            Commands::Schedule {
3264                action: ScheduleAction::List
3265            }
3266        ));
3267    }
3268
3269    #[test]
3270    fn test_cli_parse_schedule_list_alias() {
3271        let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3272        assert!(matches!(
3273            &cli.command,
3274            Commands::Schedule {
3275                action: ScheduleAction::List
3276            }
3277        ));
3278    }
3279
3280    #[test]
3281    fn test_cli_parse_schedule_remove_alias() {
3282        let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3283        assert!(matches!(
3284            &cli.command,
3285            Commands::Schedule {
3286                action: ScheduleAction::Remove { name }
3287            } if name == "nightly"
3288        ));
3289    }
3290
3291    #[test]
3292    fn test_cli_parse_schedule_install_custom_provider() {
3293        let cli = Cli::try_parse_from([
3294            "pulpo",
3295            "schedule",
3296            "install",
3297            "daily",
3298            "0 9 * * *",
3299            "--workdir",
3300            "/tmp",
3301            "--provider",
3302            "codex",
3303            "Run tests",
3304        ])
3305        .unwrap();
3306        assert!(matches!(
3307            &cli.command,
3308            Commands::Schedule {
3309                action: ScheduleAction::Install { provider, .. }
3310            } if provider == "codex"
3311        ));
3312    }
3313
3314    #[tokio::test]
3315    async fn test_execute_schedule_via_execute() {
3316        // Under coverage builds, execute_schedule is a stub returning Ok("")
3317        let node = start_test_server().await;
3318        let cli = Cli {
3319            node,
3320            token: None,
3321            command: Commands::Schedule {
3322                action: ScheduleAction::List,
3323            },
3324        };
3325        let result = execute(&cli).await;
3326        // Under coverage: succeeds with empty string; under non-coverage: may fail (no crontab)
3327        assert!(result.is_ok() || result.is_err());
3328    }
3329
3330    #[test]
3331    fn test_schedule_action_debug() {
3332        let action = ScheduleAction::List;
3333        assert_eq!(format!("{action:?}"), "List");
3334    }
3335
3336    // ── Knowledge CLI tests ─────────────────────────────────────────────
3337
3338    #[test]
3339    fn test_cli_parse_knowledge() {
3340        let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3341        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3342    }
3343
3344    #[test]
3345    fn test_cli_parse_knowledge_alias() {
3346        let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3347        assert!(matches!(cli.command, Commands::Knowledge { .. }));
3348    }
3349
3350    #[test]
3351    fn test_cli_parse_knowledge_with_filters() {
3352        let cli = Cli::try_parse_from([
3353            "pulpo",
3354            "knowledge",
3355            "--kind",
3356            "failure",
3357            "--repo",
3358            "/tmp/repo",
3359            "--ink",
3360            "coder",
3361            "--limit",
3362            "5",
3363        ])
3364        .unwrap();
3365        match &cli.command {
3366            Commands::Knowledge {
3367                kind,
3368                repo,
3369                ink,
3370                limit,
3371                ..
3372            } => {
3373                assert_eq!(kind.as_deref(), Some("failure"));
3374                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3375                assert_eq!(ink.as_deref(), Some("coder"));
3376                assert_eq!(*limit, 5);
3377            }
3378            _ => panic!("expected Knowledge command"),
3379        }
3380    }
3381
3382    #[test]
3383    fn test_cli_parse_knowledge_context() {
3384        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3385            .unwrap();
3386        match &cli.command {
3387            Commands::Knowledge { context, repo, .. } => {
3388                assert!(*context);
3389                assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3390            }
3391            _ => panic!("expected Knowledge command"),
3392        }
3393    }
3394
3395    #[test]
3396    fn test_format_knowledge_empty() {
3397        assert_eq!(format_knowledge(&[]), "No knowledge found.");
3398    }
3399
3400    #[test]
3401    fn test_format_knowledge_items() {
3402        use chrono::Utc;
3403        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3404        use uuid::Uuid;
3405
3406        let items = vec![
3407            Knowledge {
3408                id: Uuid::new_v4(),
3409                session_id: Uuid::new_v4(),
3410                kind: KnowledgeKind::Summary,
3411                scope_repo: Some("/tmp/repo".into()),
3412                scope_ink: Some("coder".into()),
3413                title: "Fixed the auth bug".into(),
3414                body: "Details".into(),
3415                tags: vec!["claude".into(), "completed".into()],
3416                relevance: 0.7,
3417                created_at: Utc::now(),
3418            },
3419            Knowledge {
3420                id: Uuid::new_v4(),
3421                session_id: Uuid::new_v4(),
3422                kind: KnowledgeKind::Failure,
3423                scope_repo: None,
3424                scope_ink: None,
3425                title: "OOM crash during build".into(),
3426                body: "Details".into(),
3427                tags: vec!["failure".into()],
3428                relevance: 0.9,
3429                created_at: Utc::now(),
3430            },
3431        ];
3432
3433        let output = format_knowledge(&items);
3434        assert!(output.contains("KIND"));
3435        assert!(output.contains("TITLE"));
3436        assert!(output.contains("summary"));
3437        assert!(output.contains("failure"));
3438        assert!(output.contains("Fixed the auth bug"));
3439        assert!(output.contains("repo"));
3440        assert!(output.contains("0.70"));
3441    }
3442
3443    #[test]
3444    fn test_format_knowledge_long_title_truncated() {
3445        use chrono::Utc;
3446        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3447        use uuid::Uuid;
3448
3449        let items = vec![Knowledge {
3450            id: Uuid::new_v4(),
3451            session_id: Uuid::new_v4(),
3452            kind: KnowledgeKind::Summary,
3453            scope_repo: Some("/repo".into()),
3454            scope_ink: None,
3455            title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3456            body: "Body".into(),
3457            tags: vec![],
3458            relevance: 0.5,
3459            created_at: Utc::now(),
3460        }];
3461
3462        let output = format_knowledge(&items);
3463        assert!(output.contains('…'));
3464    }
3465
3466    #[test]
3467    fn test_cli_parse_knowledge_get() {
3468        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3469        match &cli.command {
3470            Commands::Knowledge { get, .. } => {
3471                assert_eq!(get.as_deref(), Some("abc-123"));
3472            }
3473            _ => panic!("expected Knowledge command"),
3474        }
3475    }
3476
3477    #[test]
3478    fn test_cli_parse_knowledge_delete() {
3479        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3480        match &cli.command {
3481            Commands::Knowledge { delete, .. } => {
3482                assert_eq!(delete.as_deref(), Some("abc-123"));
3483            }
3484            _ => panic!("expected Knowledge command"),
3485        }
3486    }
3487
3488    #[test]
3489    fn test_cli_parse_knowledge_push() {
3490        let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3491        match &cli.command {
3492            Commands::Knowledge { push, .. } => {
3493                assert!(*push);
3494            }
3495            _ => panic!("expected Knowledge command"),
3496        }
3497    }
3498
3499    #[test]
3500    fn test_format_knowledge_no_repo() {
3501        use chrono::Utc;
3502        use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3503        use uuid::Uuid;
3504
3505        let items = vec![Knowledge {
3506            id: Uuid::new_v4(),
3507            session_id: Uuid::new_v4(),
3508            kind: KnowledgeKind::Summary,
3509            scope_repo: None,
3510            scope_ink: None,
3511            title: "Global finding".into(),
3512            body: "Body".into(),
3513            tags: vec![],
3514            relevance: 0.5,
3515            created_at: Utc::now(),
3516        }];
3517
3518        let output = format_knowledge(&items);
3519        assert!(output.contains('-')); // "-" for no repo
3520    }
3521}