1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{AuthTokenResponse, InterventionEventResponse, PeersResponse};
4use pulpo_common::session::Session;
5
6#[derive(Parser, Debug)]
7#[command(
8 name = "pulpo",
9 about = "Manage agent sessions across your machines",
10 version
11)]
12pub struct Cli {
13 #[arg(long, default_value = "localhost:7433")]
15 pub node: String,
16
17 #[arg(long)]
19 pub token: Option<String>,
20
21 #[command(subcommand)]
22 pub command: Commands,
23}
24
25#[derive(Subcommand, Debug)]
26#[allow(clippy::large_enum_variant)]
27pub enum Commands {
28 #[command(visible_alias = "a")]
30 Attach {
31 name: String,
33 },
34
35 #[command(visible_alias = "i")]
37 Input {
38 name: String,
40 text: Option<String>,
42 },
43
44 #[command(visible_alias = "s")]
46 Spawn {
47 #[arg(long)]
49 workdir: String,
50
51 #[arg(long)]
53 name: Option<String>,
54
55 #[arg(long, default_value = "claude")]
57 provider: String,
58
59 #[arg(long)]
61 auto: bool,
62
63 #[arg(long, default_value = "standard")]
65 guard: String,
66
67 #[arg(long)]
69 model: Option<String>,
70
71 #[arg(long)]
73 system_prompt: Option<String>,
74
75 #[arg(long, value_delimiter = ',')]
77 allowed_tools: Option<Vec<String>>,
78
79 #[arg(long)]
81 persona: Option<String>,
82
83 #[arg(long)]
85 max_turns: Option<u32>,
86
87 #[arg(long)]
89 max_budget: Option<f64>,
90
91 #[arg(long)]
93 output_format: Option<String>,
94
95 prompt: Vec<String>,
97 },
98
99 #[command(visible_alias = "ls")]
101 List,
102
103 #[command(visible_alias = "l")]
105 Logs {
106 name: String,
108
109 #[arg(long, default_value = "100")]
111 lines: usize,
112
113 #[arg(short, long)]
115 follow: bool,
116 },
117
118 #[command(visible_alias = "k")]
120 Kill {
121 name: String,
123 },
124
125 #[command(visible_alias = "rm")]
127 Delete {
128 name: String,
130 },
131
132 #[command(visible_alias = "r")]
134 Resume {
135 name: String,
137 },
138
139 #[command(visible_alias = "n")]
141 Nodes,
142
143 #[command(visible_alias = "iv")]
145 Interventions {
146 name: String,
148 },
149
150 Ui,
152
153 #[command(visible_alias = "sched")]
155 Schedule {
156 #[command(subcommand)]
157 action: ScheduleAction,
158 },
159}
160
161#[derive(Subcommand, Debug)]
162pub enum ScheduleAction {
163 Install {
165 name: String,
167 cron: String,
169 #[arg(long)]
171 workdir: String,
172 #[arg(long, default_value = "claude")]
174 provider: String,
175 prompt: Vec<String>,
177 },
178 #[command(alias = "ls")]
180 List,
181 #[command(alias = "rm")]
183 Remove {
184 name: String,
186 },
187 Pause {
189 name: String,
191 },
192 Resume {
194 name: String,
196 },
197}
198
199pub fn base_url(node: &str) -> String {
201 format!("http://{node}")
202}
203
204#[derive(serde::Deserialize)]
206struct OutputResponse {
207 output: String,
208}
209
210fn format_sessions(sessions: &[Session]) -> String {
212 if sessions.is_empty() {
213 return "No sessions.".into();
214 }
215 let mut lines = vec![format!(
216 "{:<20} {:<12} {:<10} {:<14} {}",
217 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
218 )];
219 for s in sessions {
220 let prompt_display = if s.prompt.len() > 40 {
221 format!("{}...", &s.prompt[..37])
222 } else {
223 s.prompt.clone()
224 };
225 let status_display = if s.waiting_for_input {
226 "waiting".to_owned()
227 } else {
228 s.status.to_string()
229 };
230 lines.push(format!(
231 "{:<20} {:<12} {:<10} {:<14} {}",
232 s.name, status_display, s.provider, s.mode, prompt_display
233 ));
234 }
235 lines.join("\n")
236}
237
238fn format_nodes(resp: &PeersResponse) -> String {
240 let mut lines = vec![format!(
241 "{:<20} {:<25} {:<10} {}",
242 "NAME", "ADDRESS", "STATUS", "SESSIONS"
243 )];
244 lines.push(format!(
245 "{:<20} {:<25} {:<10} {}",
246 resp.local.name, "(local)", "online", "-"
247 ));
248 for p in &resp.peers {
249 let sessions = p
250 .session_count
251 .map_or_else(|| "-".into(), |c| c.to_string());
252 lines.push(format!(
253 "{:<20} {:<25} {:<10} {}",
254 p.name, p.address, p.status, sessions
255 ));
256 }
257 lines.join("\n")
258}
259
260fn format_interventions(events: &[InterventionEventResponse]) -> String {
262 if events.is_empty() {
263 return "No intervention events.".into();
264 }
265 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
266 for e in events {
267 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
268 }
269 lines.join("\n")
270}
271
272#[cfg_attr(coverage, allow(dead_code))]
274fn build_open_command(url: &str) -> std::process::Command {
275 #[cfg(target_os = "macos")]
276 {
277 let mut cmd = std::process::Command::new("open");
278 cmd.arg(url);
279 cmd
280 }
281 #[cfg(target_os = "linux")]
282 {
283 let mut cmd = std::process::Command::new("xdg-open");
284 cmd.arg(url);
285 cmd
286 }
287 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
288 {
289 let mut cmd = std::process::Command::new("xdg-open");
291 cmd.arg(url);
292 cmd
293 }
294}
295
296#[cfg(not(coverage))]
298fn open_browser(url: &str) -> Result<()> {
299 build_open_command(url).status()?;
300 Ok(())
301}
302
303#[cfg(coverage)]
305fn open_browser(_url: &str) -> Result<()> {
306 Ok(())
307}
308
309#[cfg_attr(coverage, allow(dead_code))]
312fn build_attach_command(name: &str) -> std::process::Command {
313 let mut cmd = std::process::Command::new("tmux");
314 cmd.args(["attach-session", "-t", &format!("pulpo-{name}")]);
315 cmd
316}
317
318#[cfg(not(any(test, coverage)))]
320fn attach_session(name: &str) -> Result<()> {
321 let status = build_attach_command(name).status()?;
322 if !status.success() {
323 anyhow::bail!("attach failed with {status}");
324 }
325 Ok(())
326}
327
328#[cfg(any(test, coverage))]
330#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
331fn attach_session(_name: &str) -> Result<()> {
332 Ok(())
333}
334
335fn api_error(text: &str) -> anyhow::Error {
337 serde_json::from_str::<serde_json::Value>(text)
338 .ok()
339 .and_then(|v| v["error"].as_str().map(String::from))
340 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
341}
342
343async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
345 if resp.status().is_success() {
346 Ok(resp.text().await?)
347 } else {
348 let text = resp.text().await?;
349 Err(api_error(&text))
350 }
351}
352
353fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
355 if err.is_connect() {
356 anyhow::anyhow!(
357 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
358 )
359 } else {
360 anyhow::anyhow!("Network error connecting to {node}: {err}")
361 }
362}
363
364fn is_localhost(node: &str) -> bool {
366 let host = node.split(':').next().unwrap_or(node);
367 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
368}
369
370async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
372 let resp = client
373 .get(format!("{base}/api/v1/auth/token"))
374 .send()
375 .await
376 .ok()?;
377 let body: AuthTokenResponse = resp.json().await.ok()?;
378 if body.token.is_empty() {
379 None
380 } else {
381 Some(body.token)
382 }
383}
384
385async fn resolve_token(
387 client: &reqwest::Client,
388 base: &str,
389 node: &str,
390 explicit: Option<&str>,
391) -> Option<String> {
392 if let Some(t) = explicit {
393 return Some(t.to_owned());
394 }
395 if is_localhost(node) {
396 return discover_token(client, base).await;
397 }
398 None
399}
400
401fn authed_get(
403 client: &reqwest::Client,
404 url: String,
405 token: Option<&str>,
406) -> reqwest::RequestBuilder {
407 let req = client.get(url);
408 if let Some(t) = token {
409 req.bearer_auth(t)
410 } else {
411 req
412 }
413}
414
415fn authed_post(
417 client: &reqwest::Client,
418 url: String,
419 token: Option<&str>,
420) -> reqwest::RequestBuilder {
421 let req = client.post(url);
422 if let Some(t) = token {
423 req.bearer_auth(t)
424 } else {
425 req
426 }
427}
428
429fn authed_delete(
431 client: &reqwest::Client,
432 url: String,
433 token: Option<&str>,
434) -> reqwest::RequestBuilder {
435 let req = client.delete(url);
436 if let Some(t) = token {
437 req.bearer_auth(t)
438 } else {
439 req
440 }
441}
442
443async fn fetch_output(
445 client: &reqwest::Client,
446 base: &str,
447 name: &str,
448 lines: usize,
449 token: Option<&str>,
450) -> Result<String> {
451 let resp = authed_get(
452 client,
453 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
454 token,
455 )
456 .send()
457 .await?;
458 let text = ok_or_api_error(resp).await?;
459 let output: OutputResponse = serde_json::from_str(&text)?;
460 Ok(output.output)
461}
462
463async fn fetch_session_status(
465 client: &reqwest::Client,
466 base: &str,
467 name: &str,
468 token: Option<&str>,
469) -> Result<String> {
470 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
471 .send()
472 .await?;
473 let text = ok_or_api_error(resp).await?;
474 let session: Session = serde_json::from_str(&text)?;
475 Ok(session.status.to_string())
476}
477
478fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
485 if prev.is_empty() {
486 return new;
487 }
488
489 let prev_lines: Vec<&str> = prev.lines().collect();
490 let new_lines: Vec<&str> = new.lines().collect();
491
492 if new_lines.is_empty() {
493 return "";
494 }
495
496 let last_prev = prev_lines[prev_lines.len() - 1];
498
499 for i in (0..new_lines.len()).rev() {
501 if new_lines[i] == last_prev {
502 let overlap_len = prev_lines.len().min(i + 1);
504 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
505 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
506 if prev_tail == new_overlap {
507 if i + 1 < new_lines.len() {
508 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
510 return new.get(consumed.min(new.len())..).unwrap_or("");
511 }
512 return "";
513 }
514 }
515 }
516
517 new
519}
520
521async fn follow_logs(
523 client: &reqwest::Client,
524 base: &str,
525 name: &str,
526 lines: usize,
527 token: Option<&str>,
528 writer: &mut (dyn std::io::Write + Send),
529) -> Result<()> {
530 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
531 write!(writer, "{prev_output}")?;
532
533 loop {
534 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
535
536 let status = fetch_session_status(client, base, name, token).await?;
538 let is_terminal = status == "completed" || status == "dead" || status == "stale";
539
540 let new_output = fetch_output(client, base, name, lines, token).await?;
542
543 let diff = diff_output(&prev_output, &new_output);
544 if !diff.is_empty() {
545 write!(writer, "{diff}")?;
546 }
547 prev_output = new_output;
548
549 if is_terminal {
550 break;
551 }
552 }
553 Ok(())
554}
555
556#[cfg_attr(coverage, allow(dead_code))]
559const CRONTAB_TAG: &str = "#pulpo:";
560
561#[cfg(not(coverage))]
563fn read_crontab() -> Result<String> {
564 let output = std::process::Command::new("crontab").arg("-l").output()?;
565 if output.status.success() {
566 Ok(String::from_utf8_lossy(&output.stdout).to_string())
567 } else {
568 Ok(String::new())
569 }
570}
571
572#[cfg(not(coverage))]
574fn write_crontab(content: &str) -> Result<()> {
575 use std::io::Write;
576 let mut child = std::process::Command::new("crontab")
577 .arg("-")
578 .stdin(std::process::Stdio::piped())
579 .spawn()?;
580 child
581 .stdin
582 .as_mut()
583 .unwrap()
584 .write_all(content.as_bytes())?;
585 let status = child.wait()?;
586 if !status.success() {
587 anyhow::bail!("crontab write failed");
588 }
589 Ok(())
590}
591
592#[cfg_attr(coverage, allow(dead_code))]
594fn build_crontab_line(
595 name: &str,
596 cron: &str,
597 workdir: &str,
598 provider: &str,
599 prompt: &str,
600 node: &str,
601) -> String {
602 format!(
603 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
604 )
605}
606
607#[cfg_attr(coverage, allow(dead_code))]
609fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
610 let tag = format!("{CRONTAB_TAG}{name}");
611 if crontab.contains(&tag) {
612 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
613 }
614 let mut result = crontab.to_owned();
615 result.push_str(line);
616 Ok(result)
617}
618
619#[cfg_attr(coverage, allow(dead_code))]
621fn crontab_list(crontab: &str) -> String {
622 let entries: Vec<&str> = crontab
623 .lines()
624 .filter(|l| l.contains(CRONTAB_TAG))
625 .collect();
626 if entries.is_empty() {
627 return "No pulpo schedules.".into();
628 }
629 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
630 for entry in entries {
631 let paused = entry.starts_with('#');
632 let raw = entry.trim_start_matches('#').trim();
633 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
634 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
635 let cron_expr = if parts.len() >= 5 {
636 parts[..5].join(" ")
637 } else {
638 "?".into()
639 };
640 lines.push(format!(
641 "{:<20} {:<15} {}",
642 name,
643 cron_expr,
644 if paused { "yes" } else { "no" }
645 ));
646 }
647 lines.join("\n")
648}
649
650#[cfg_attr(coverage, allow(dead_code))]
652fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
653 use std::fmt::Write;
654 let tag = format!("{CRONTAB_TAG}{name}");
655 let filtered =
656 crontab
657 .lines()
658 .filter(|l| !l.contains(&tag))
659 .fold(String::new(), |mut acc, l| {
660 writeln!(acc, "{l}").unwrap();
661 acc
662 });
663 if filtered.len() == crontab.len() {
664 anyhow::bail!("schedule \"{name}\" not found");
665 }
666 Ok(filtered)
667}
668
669#[cfg_attr(coverage, allow(dead_code))]
671fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
672 use std::fmt::Write;
673 let tag = format!("{CRONTAB_TAG}{name}");
674 let mut found = false;
675 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
676 if l.contains(&tag) && !l.starts_with('#') {
677 found = true;
678 writeln!(acc, "#{l}").unwrap();
679 } else {
680 writeln!(acc, "{l}").unwrap();
681 }
682 acc
683 });
684 if !found {
685 anyhow::bail!("schedule \"{name}\" not found or already paused");
686 }
687 Ok(updated)
688}
689
690#[cfg_attr(coverage, allow(dead_code))]
692fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
693 use std::fmt::Write;
694 let tag = format!("{CRONTAB_TAG}{name}");
695 let mut found = false;
696 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
697 if l.contains(&tag) && l.starts_with('#') {
698 found = true;
699 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
700 } else {
701 writeln!(acc, "{l}").unwrap();
702 }
703 acc
704 });
705 if !found {
706 anyhow::bail!("schedule \"{name}\" not found or not paused");
707 }
708 Ok(updated)
709}
710
711#[cfg(not(coverage))]
713fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
714 match action {
715 ScheduleAction::Install {
716 name,
717 cron,
718 workdir,
719 provider,
720 prompt,
721 } => {
722 let crontab = read_crontab()?;
723 let joined_prompt = prompt.join(" ");
724 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
725 let updated = crontab_install(&crontab, name, &line)?;
726 write_crontab(&updated)?;
727 Ok(format!("Installed schedule \"{name}\""))
728 }
729 ScheduleAction::List => {
730 let crontab = read_crontab()?;
731 Ok(crontab_list(&crontab))
732 }
733 ScheduleAction::Remove { name } => {
734 let crontab = read_crontab()?;
735 let updated = crontab_remove(&crontab, name)?;
736 write_crontab(&updated)?;
737 Ok(format!("Removed schedule \"{name}\""))
738 }
739 ScheduleAction::Pause { name } => {
740 let crontab = read_crontab()?;
741 let updated = crontab_pause(&crontab, name)?;
742 write_crontab(&updated)?;
743 Ok(format!("Paused schedule \"{name}\""))
744 }
745 ScheduleAction::Resume { name } => {
746 let crontab = read_crontab()?;
747 let updated = crontab_resume(&crontab, name)?;
748 write_crontab(&updated)?;
749 Ok(format!("Resumed schedule \"{name}\""))
750 }
751 }
752}
753
754#[cfg(coverage)]
756fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
757 Ok(String::new())
758}
759
760#[allow(clippy::too_many_lines)]
762pub async fn execute(cli: &Cli) -> Result<String> {
763 let url = base_url(&cli.node);
764 let client = reqwest::Client::new();
765 let node = &cli.node;
766 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
767
768 match &cli.command {
769 Commands::Attach { name } => {
770 attach_session(name)?;
771 Ok(format!("Detached from session {name}."))
772 }
773 Commands::Input { name, text } => {
774 let input_text = text.as_deref().unwrap_or("\n");
775 let body = serde_json::json!({ "text": input_text });
776 let resp = authed_post(
777 &client,
778 format!("{url}/api/v1/sessions/{name}/input"),
779 token.as_deref(),
780 )
781 .json(&body)
782 .send()
783 .await
784 .map_err(|e| friendly_error(&e, node))?;
785 ok_or_api_error(resp).await?;
786 Ok(format!("Sent input to session {name}."))
787 }
788 Commands::List => {
789 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
790 .send()
791 .await
792 .map_err(|e| friendly_error(&e, node))?;
793 let text = ok_or_api_error(resp).await?;
794 let sessions: Vec<Session> = serde_json::from_str(&text)?;
795 Ok(format_sessions(&sessions))
796 }
797 Commands::Nodes => {
798 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
799 .send()
800 .await
801 .map_err(|e| friendly_error(&e, node))?;
802 let text = ok_or_api_error(resp).await?;
803 let resp: PeersResponse = serde_json::from_str(&text)?;
804 Ok(format_nodes(&resp))
805 }
806 Commands::Spawn {
807 workdir,
808 name,
809 provider,
810 auto,
811 guard,
812 model,
813 system_prompt,
814 allowed_tools,
815 persona,
816 max_turns,
817 max_budget,
818 output_format,
819 prompt,
820 } => {
821 let prompt_text = prompt.join(" ");
822 let mode = if *auto { "autonomous" } else { "interactive" };
823 let mut body = serde_json::json!({
824 "workdir": workdir,
825 "provider": provider,
826 "prompt": prompt_text,
827 "mode": mode,
828 "guard_preset": guard,
829 });
830 if let Some(n) = name {
831 body["name"] = serde_json::json!(n);
832 }
833 if let Some(m) = model {
834 body["model"] = serde_json::json!(m);
835 }
836 if let Some(sp) = system_prompt {
837 body["system_prompt"] = serde_json::json!(sp);
838 }
839 if let Some(tools) = allowed_tools {
840 body["allowed_tools"] = serde_json::json!(tools);
841 }
842 if let Some(p) = persona {
843 body["persona"] = serde_json::json!(p);
844 }
845 if let Some(mt) = max_turns {
846 body["max_turns"] = serde_json::json!(mt);
847 }
848 if let Some(mb) = max_budget {
849 body["max_budget_usd"] = serde_json::json!(mb);
850 }
851 if let Some(of) = output_format {
852 body["output_format"] = serde_json::json!(of);
853 }
854 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
855 .json(&body)
856 .send()
857 .await
858 .map_err(|e| friendly_error(&e, node))?;
859 let text = ok_or_api_error(resp).await?;
860 let session: Session = serde_json::from_str(&text)?;
861 Ok(format!(
862 "Created session \"{}\" ({})",
863 session.name, session.id
864 ))
865 }
866 Commands::Kill { name } => {
867 let resp = authed_post(
868 &client,
869 format!("{url}/api/v1/sessions/{name}/kill"),
870 token.as_deref(),
871 )
872 .send()
873 .await
874 .map_err(|e| friendly_error(&e, node))?;
875 ok_or_api_error(resp).await?;
876 Ok(format!("Session {name} killed."))
877 }
878 Commands::Delete { name } => {
879 let resp = authed_delete(
880 &client,
881 format!("{url}/api/v1/sessions/{name}"),
882 token.as_deref(),
883 )
884 .send()
885 .await
886 .map_err(|e| friendly_error(&e, node))?;
887 ok_or_api_error(resp).await?;
888 Ok(format!("Session {name} deleted."))
889 }
890 Commands::Logs {
891 name,
892 lines,
893 follow,
894 } => {
895 if *follow {
896 let mut stdout = std::io::stdout();
897 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
898 .await
899 .map_err(|e| {
900 match e.downcast::<reqwest::Error>() {
902 Ok(re) => friendly_error(&re, node),
903 Err(other) => other,
904 }
905 })?;
906 Ok(String::new())
907 } else {
908 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
909 .await
910 .map_err(|e| match e.downcast::<reqwest::Error>() {
911 Ok(re) => friendly_error(&re, node),
912 Err(other) => other,
913 })?;
914 Ok(output)
915 }
916 }
917 Commands::Interventions { name } => {
918 let resp = authed_get(
919 &client,
920 format!("{url}/api/v1/sessions/{name}/interventions"),
921 token.as_deref(),
922 )
923 .send()
924 .await
925 .map_err(|e| friendly_error(&e, node))?;
926 let text = ok_or_api_error(resp).await?;
927 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
928 Ok(format_interventions(&events))
929 }
930 Commands::Ui => {
931 let dashboard = base_url(&cli.node);
932 open_browser(&dashboard)?;
933 Ok(format!("Opening {dashboard}"))
934 }
935 Commands::Resume { name } => {
936 let resp = authed_post(
937 &client,
938 format!("{url}/api/v1/sessions/{name}/resume"),
939 token.as_deref(),
940 )
941 .send()
942 .await
943 .map_err(|e| friendly_error(&e, node))?;
944 let text = ok_or_api_error(resp).await?;
945 let session: Session = serde_json::from_str(&text)?;
946 Ok(format!("Resumed session \"{}\"", session.name))
947 }
948 Commands::Schedule { action } => execute_schedule(action, node),
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955
956 #[test]
957 fn test_base_url() {
958 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
959 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
960 }
961
962 #[test]
963 fn test_cli_parse_list() {
964 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
965 assert_eq!(cli.node, "localhost:7433");
966 assert!(matches!(cli.command, Commands::List));
967 }
968
969 #[test]
970 fn test_cli_parse_nodes() {
971 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
972 assert!(matches!(cli.command, Commands::Nodes));
973 }
974
975 #[test]
976 fn test_cli_parse_ui() {
977 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
978 assert!(matches!(cli.command, Commands::Ui));
979 }
980
981 #[test]
982 fn test_cli_parse_ui_custom_node() {
983 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
984 assert!(matches!(cli.command, Commands::Ui));
985 assert_eq!(cli.node, "mac-mini:7433");
986 }
987
988 #[test]
989 fn test_build_open_command() {
990 let cmd = build_open_command("http://localhost:7433");
991 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
992 assert_eq!(args, vec!["http://localhost:7433"]);
993 #[cfg(target_os = "macos")]
994 assert_eq!(cmd.get_program(), "open");
995 #[cfg(target_os = "linux")]
996 assert_eq!(cmd.get_program(), "xdg-open");
997 }
998
999 #[test]
1000 fn test_cli_parse_spawn() {
1001 let cli = Cli::try_parse_from([
1002 "pulpo",
1003 "spawn",
1004 "--workdir",
1005 "/tmp/repo",
1006 "Fix",
1007 "the",
1008 "bug",
1009 ])
1010 .unwrap();
1011 assert!(matches!(
1012 &cli.command,
1013 Commands::Spawn { workdir, provider, auto, guard, prompt, .. }
1014 if workdir == "/tmp/repo" && provider == "claude" && !auto
1015 && guard == "standard" && prompt == &["Fix", "the", "bug"]
1016 ));
1017 }
1018
1019 #[test]
1020 fn test_cli_parse_spawn_with_provider() {
1021 let cli = Cli::try_parse_from([
1022 "pulpo",
1023 "spawn",
1024 "--workdir",
1025 "/tmp",
1026 "--provider",
1027 "codex",
1028 "Do it",
1029 ])
1030 .unwrap();
1031 assert!(matches!(
1032 &cli.command,
1033 Commands::Spawn { provider, .. } if provider == "codex"
1034 ));
1035 }
1036
1037 #[test]
1038 fn test_cli_parse_spawn_auto() {
1039 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1040 .unwrap();
1041 assert!(matches!(
1042 &cli.command,
1043 Commands::Spawn { auto, .. } if *auto
1044 ));
1045 }
1046
1047 #[test]
1048 fn test_cli_parse_spawn_with_guard() {
1049 let cli = Cli::try_parse_from([
1050 "pulpo",
1051 "spawn",
1052 "--workdir",
1053 "/tmp",
1054 "--guard",
1055 "strict",
1056 "Do it",
1057 ])
1058 .unwrap();
1059 assert!(matches!(
1060 &cli.command,
1061 Commands::Spawn { guard, .. } if guard == "strict"
1062 ));
1063 }
1064
1065 #[test]
1066 fn test_cli_parse_spawn_guard_default() {
1067 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1068 assert!(matches!(
1069 &cli.command,
1070 Commands::Spawn { guard, .. } if guard == "standard"
1071 ));
1072 }
1073
1074 #[test]
1075 fn test_cli_parse_spawn_with_name() {
1076 let cli = Cli::try_parse_from([
1077 "pulpo",
1078 "spawn",
1079 "--workdir",
1080 "/tmp/repo",
1081 "--name",
1082 "my-task",
1083 "Fix it",
1084 ])
1085 .unwrap();
1086 assert!(matches!(
1087 &cli.command,
1088 Commands::Spawn { workdir, name, .. }
1089 if workdir == "/tmp/repo" && name.as_deref() == Some("my-task")
1090 ));
1091 }
1092
1093 #[test]
1094 fn test_cli_parse_spawn_without_name() {
1095 let cli =
1096 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1097 assert!(matches!(
1098 &cli.command,
1099 Commands::Spawn { name, .. } if name.is_none()
1100 ));
1101 }
1102
1103 #[test]
1104 fn test_cli_parse_logs() {
1105 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1106 assert!(matches!(
1107 &cli.command,
1108 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1109 ));
1110 }
1111
1112 #[test]
1113 fn test_cli_parse_logs_with_lines() {
1114 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1115 assert!(matches!(
1116 &cli.command,
1117 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1118 ));
1119 }
1120
1121 #[test]
1122 fn test_cli_parse_logs_follow() {
1123 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1124 assert!(matches!(
1125 &cli.command,
1126 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1127 ));
1128 }
1129
1130 #[test]
1131 fn test_cli_parse_logs_follow_short() {
1132 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1133 assert!(matches!(
1134 &cli.command,
1135 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1136 ));
1137 }
1138
1139 #[test]
1140 fn test_cli_parse_kill() {
1141 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1142 assert!(matches!(
1143 &cli.command,
1144 Commands::Kill { name } if name == "my-session"
1145 ));
1146 }
1147
1148 #[test]
1149 fn test_cli_parse_delete() {
1150 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1151 assert!(matches!(
1152 &cli.command,
1153 Commands::Delete { name } if name == "my-session"
1154 ));
1155 }
1156
1157 #[test]
1158 fn test_cli_parse_resume() {
1159 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1160 assert!(matches!(
1161 &cli.command,
1162 Commands::Resume { name } if name == "my-session"
1163 ));
1164 }
1165
1166 #[test]
1167 fn test_cli_parse_input() {
1168 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1169 assert!(matches!(
1170 &cli.command,
1171 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1172 ));
1173 }
1174
1175 #[test]
1176 fn test_cli_parse_input_no_text() {
1177 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1178 assert!(matches!(
1179 &cli.command,
1180 Commands::Input { name, text } if name == "my-session" && text.is_none()
1181 ));
1182 }
1183
1184 #[test]
1185 fn test_cli_parse_input_alias() {
1186 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1187 assert!(matches!(
1188 &cli.command,
1189 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1190 ));
1191 }
1192
1193 #[test]
1194 fn test_cli_parse_custom_node() {
1195 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1196 assert_eq!(cli.node, "win-pc:8080");
1197 }
1198
1199 #[test]
1200 fn test_cli_version() {
1201 let result = Cli::try_parse_from(["pulpo", "--version"]);
1202 let err = result.unwrap_err();
1204 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1205 }
1206
1207 #[test]
1208 fn test_cli_parse_no_subcommand_fails() {
1209 let result = Cli::try_parse_from(["pulpo"]);
1210 assert!(result.is_err());
1211 }
1212
1213 #[test]
1214 fn test_cli_debug() {
1215 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1216 let debug = format!("{cli:?}");
1217 assert!(debug.contains("List"));
1218 }
1219
1220 #[test]
1221 fn test_commands_debug() {
1222 let cmd = Commands::List;
1223 assert_eq!(format!("{cmd:?}"), "List");
1224 }
1225
1226 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1228
1229 async fn start_test_server() -> String {
1231 use axum::http::StatusCode;
1232 use axum::{
1233 Json, Router,
1234 routing::{delete, get, post},
1235 };
1236
1237 let session_json = TEST_SESSION_JSON;
1238
1239 let app = Router::new()
1240 .route(
1241 "/api/v1/sessions",
1242 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1243 (StatusCode::CREATED, session_json.to_owned())
1244 }),
1245 )
1246 .route(
1247 "/api/v1/sessions/{id}",
1248 delete(|| async { StatusCode::NO_CONTENT }),
1249 )
1250 .route(
1251 "/api/v1/sessions/{id}/kill",
1252 post(|| async { StatusCode::NO_CONTENT }),
1253 )
1254 .route(
1255 "/api/v1/sessions/{id}/output",
1256 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1257 )
1258 .route(
1259 "/api/v1/peers",
1260 get(|| async {
1261 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1262 }),
1263 )
1264 .route(
1265 "/api/v1/sessions/{id}/resume",
1266 axum::routing::post(move || async move { session_json.to_owned() }),
1267 )
1268 .route(
1269 "/api/v1/sessions/{id}/interventions",
1270 get(|| async { "[]".to_owned() }),
1271 )
1272 .route(
1273 "/api/v1/sessions/{id}/input",
1274 post(|| async { StatusCode::NO_CONTENT }),
1275 );
1276
1277 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1278 let addr = listener.local_addr().unwrap();
1279 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1280 format!("127.0.0.1:{}", addr.port())
1281 }
1282
1283 #[tokio::test]
1284 async fn test_execute_list_success() {
1285 let node = start_test_server().await;
1286 let cli = Cli {
1287 node,
1288 token: None,
1289 command: Commands::List,
1290 };
1291 let result = execute(&cli).await.unwrap();
1292 assert_eq!(result, "No sessions.");
1293 }
1294
1295 #[tokio::test]
1296 async fn test_execute_nodes_success() {
1297 let node = start_test_server().await;
1298 let cli = Cli {
1299 node,
1300 token: None,
1301 command: Commands::Nodes,
1302 };
1303 let result = execute(&cli).await.unwrap();
1304 assert!(result.contains("test"));
1305 assert!(result.contains("(local)"));
1306 assert!(result.contains("NAME"));
1307 }
1308
1309 #[tokio::test]
1310 async fn test_execute_spawn_success() {
1311 let node = start_test_server().await;
1312 let cli = Cli {
1313 node,
1314 token: None,
1315 command: Commands::Spawn {
1316 workdir: "/tmp/repo".into(),
1317 name: None,
1318 provider: "claude".into(),
1319 auto: false,
1320 guard: "standard".into(),
1321 model: None,
1322 system_prompt: None,
1323 allowed_tools: None,
1324 persona: None,
1325 max_turns: None,
1326 max_budget: None,
1327 output_format: None,
1328 prompt: vec!["Fix".into(), "bug".into()],
1329 },
1330 };
1331 let result = execute(&cli).await.unwrap();
1332 assert!(result.contains("Created session"));
1333 assert!(result.contains("repo"));
1334 }
1335
1336 #[tokio::test]
1337 async fn test_execute_spawn_with_all_new_flags() {
1338 let node = start_test_server().await;
1339 let cli = Cli {
1340 node,
1341 token: None,
1342 command: Commands::Spawn {
1343 workdir: "/tmp/repo".into(),
1344 name: None,
1345 provider: "claude".into(),
1346 auto: false,
1347 guard: "standard".into(),
1348 model: Some("opus".into()),
1349 system_prompt: Some("Be helpful".into()),
1350 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1351 persona: Some("coder".into()),
1352 max_turns: Some(5),
1353 max_budget: Some(2.5),
1354 output_format: Some("json".into()),
1355 prompt: vec!["Fix".into(), "bug".into()],
1356 },
1357 };
1358 let result = execute(&cli).await.unwrap();
1359 assert!(result.contains("Created session"));
1360 }
1361
1362 #[tokio::test]
1363 async fn test_execute_spawn_auto_mode() {
1364 let node = start_test_server().await;
1365 let cli = Cli {
1366 node,
1367 token: None,
1368 command: Commands::Spawn {
1369 workdir: "/tmp/repo".into(),
1370 name: None,
1371 provider: "claude".into(),
1372 auto: true,
1373 guard: "standard".into(),
1374 model: None,
1375 system_prompt: None,
1376 allowed_tools: None,
1377 persona: None,
1378 max_turns: None,
1379 max_budget: None,
1380 output_format: None,
1381 prompt: vec!["Do it".into()],
1382 },
1383 };
1384 let result = execute(&cli).await.unwrap();
1385 assert!(result.contains("Created session"));
1386 }
1387
1388 #[tokio::test]
1389 async fn test_execute_spawn_with_name() {
1390 let node = start_test_server().await;
1391 let cli = Cli {
1392 node,
1393 token: None,
1394 command: Commands::Spawn {
1395 workdir: "/tmp/repo".into(),
1396 name: Some("my-task".into()),
1397 provider: "claude".into(),
1398 auto: false,
1399 guard: "standard".into(),
1400 model: None,
1401 system_prompt: None,
1402 allowed_tools: None,
1403 persona: None,
1404 max_turns: None,
1405 max_budget: None,
1406 output_format: None,
1407 prompt: vec!["Fix".into(), "bug".into()],
1408 },
1409 };
1410 let result = execute(&cli).await.unwrap();
1411 assert!(result.contains("Created session"));
1412 }
1413
1414 #[tokio::test]
1415 async fn test_execute_kill_success() {
1416 let node = start_test_server().await;
1417 let cli = Cli {
1418 node,
1419 token: None,
1420 command: Commands::Kill {
1421 name: "test-session".into(),
1422 },
1423 };
1424 let result = execute(&cli).await.unwrap();
1425 assert!(result.contains("killed"));
1426 }
1427
1428 #[tokio::test]
1429 async fn test_execute_delete_success() {
1430 let node = start_test_server().await;
1431 let cli = Cli {
1432 node,
1433 token: None,
1434 command: Commands::Delete {
1435 name: "test-session".into(),
1436 },
1437 };
1438 let result = execute(&cli).await.unwrap();
1439 assert!(result.contains("deleted"));
1440 }
1441
1442 #[tokio::test]
1443 async fn test_execute_logs_success() {
1444 let node = start_test_server().await;
1445 let cli = Cli {
1446 node,
1447 token: None,
1448 command: Commands::Logs {
1449 name: "test-session".into(),
1450 lines: 50,
1451 follow: false,
1452 },
1453 };
1454 let result = execute(&cli).await.unwrap();
1455 assert!(result.contains("test output"));
1456 }
1457
1458 #[tokio::test]
1459 async fn test_execute_list_connection_refused() {
1460 let cli = Cli {
1461 node: "localhost:1".into(),
1462 token: None,
1463 command: Commands::List,
1464 };
1465 let result = execute(&cli).await;
1466 let err = result.unwrap_err().to_string();
1467 assert!(
1468 err.contains("Could not connect to pulpod"),
1469 "Expected friendly error, got: {err}"
1470 );
1471 assert!(err.contains("localhost:1"));
1472 }
1473
1474 #[tokio::test]
1475 async fn test_execute_nodes_connection_refused() {
1476 let cli = Cli {
1477 node: "localhost:1".into(),
1478 token: None,
1479 command: Commands::Nodes,
1480 };
1481 let result = execute(&cli).await;
1482 let err = result.unwrap_err().to_string();
1483 assert!(err.contains("Could not connect to pulpod"));
1484 }
1485
1486 #[tokio::test]
1487 async fn test_execute_kill_error_response() {
1488 use axum::{Router, http::StatusCode, routing::post};
1489
1490 let app = Router::new().route(
1491 "/api/v1/sessions/{id}/kill",
1492 post(|| async {
1493 (
1494 StatusCode::NOT_FOUND,
1495 "{\"error\":\"session not found: test-session\"}",
1496 )
1497 }),
1498 );
1499 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1500 let addr = listener.local_addr().unwrap();
1501 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1502 let node = format!("127.0.0.1:{}", addr.port());
1503
1504 let cli = Cli {
1505 node,
1506 token: None,
1507 command: Commands::Kill {
1508 name: "test-session".into(),
1509 },
1510 };
1511 let err = execute(&cli).await.unwrap_err();
1512 assert_eq!(err.to_string(), "session not found: test-session");
1513 }
1514
1515 #[tokio::test]
1516 async fn test_execute_delete_error_response() {
1517 use axum::{Router, http::StatusCode, routing::delete};
1518
1519 let app = Router::new().route(
1520 "/api/v1/sessions/{id}",
1521 delete(|| async {
1522 (
1523 StatusCode::CONFLICT,
1524 "{\"error\":\"cannot delete session in 'running' state\"}",
1525 )
1526 }),
1527 );
1528 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1529 let addr = listener.local_addr().unwrap();
1530 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1531 let node = format!("127.0.0.1:{}", addr.port());
1532
1533 let cli = Cli {
1534 node,
1535 token: None,
1536 command: Commands::Delete {
1537 name: "test-session".into(),
1538 },
1539 };
1540 let err = execute(&cli).await.unwrap_err();
1541 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1542 }
1543
1544 #[tokio::test]
1545 async fn test_execute_logs_error_response() {
1546 use axum::{Router, http::StatusCode, routing::get};
1547
1548 let app = Router::new().route(
1549 "/api/v1/sessions/{id}/output",
1550 get(|| async {
1551 (
1552 StatusCode::NOT_FOUND,
1553 "{\"error\":\"session not found: ghost\"}",
1554 )
1555 }),
1556 );
1557 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1558 let addr = listener.local_addr().unwrap();
1559 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1560 let node = format!("127.0.0.1:{}", addr.port());
1561
1562 let cli = Cli {
1563 node,
1564 token: None,
1565 command: Commands::Logs {
1566 name: "ghost".into(),
1567 lines: 50,
1568 follow: false,
1569 },
1570 };
1571 let err = execute(&cli).await.unwrap_err();
1572 assert_eq!(err.to_string(), "session not found: ghost");
1573 }
1574
1575 #[tokio::test]
1576 async fn test_execute_resume_error_response() {
1577 use axum::{Router, http::StatusCode, routing::post};
1578
1579 let app = Router::new().route(
1580 "/api/v1/sessions/{id}/resume",
1581 post(|| async {
1582 (
1583 StatusCode::BAD_REQUEST,
1584 "{\"error\":\"session is not stale (status: running)\"}",
1585 )
1586 }),
1587 );
1588 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1589 let addr = listener.local_addr().unwrap();
1590 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1591 let node = format!("127.0.0.1:{}", addr.port());
1592
1593 let cli = Cli {
1594 node,
1595 token: None,
1596 command: Commands::Resume {
1597 name: "test-session".into(),
1598 },
1599 };
1600 let err = execute(&cli).await.unwrap_err();
1601 assert_eq!(err.to_string(), "session is not stale (status: running)");
1602 }
1603
1604 #[tokio::test]
1605 async fn test_execute_spawn_error_response() {
1606 use axum::{Router, http::StatusCode, routing::post};
1607
1608 let app = Router::new().route(
1609 "/api/v1/sessions",
1610 post(|| async {
1611 (
1612 StatusCode::INTERNAL_SERVER_ERROR,
1613 "{\"error\":\"failed to spawn session\"}",
1614 )
1615 }),
1616 );
1617 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1618 let addr = listener.local_addr().unwrap();
1619 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1620 let node = format!("127.0.0.1:{}", addr.port());
1621
1622 let cli = Cli {
1623 node,
1624 token: None,
1625 command: Commands::Spawn {
1626 workdir: "/tmp/repo".into(),
1627 name: None,
1628 provider: "claude".into(),
1629 auto: false,
1630 guard: "standard".into(),
1631 model: None,
1632 system_prompt: None,
1633 allowed_tools: None,
1634 persona: None,
1635 max_turns: None,
1636 max_budget: None,
1637 output_format: None,
1638 prompt: vec!["test".into()],
1639 },
1640 };
1641 let err = execute(&cli).await.unwrap_err();
1642 assert_eq!(err.to_string(), "failed to spawn session");
1643 }
1644
1645 #[tokio::test]
1646 async fn test_execute_interventions_error_response() {
1647 use axum::{Router, http::StatusCode, routing::get};
1648
1649 let app = Router::new().route(
1650 "/api/v1/sessions/{id}/interventions",
1651 get(|| async {
1652 (
1653 StatusCode::NOT_FOUND,
1654 "{\"error\":\"session not found: ghost\"}",
1655 )
1656 }),
1657 );
1658 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1659 let addr = listener.local_addr().unwrap();
1660 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1661 let node = format!("127.0.0.1:{}", addr.port());
1662
1663 let cli = Cli {
1664 node,
1665 token: None,
1666 command: Commands::Interventions {
1667 name: "ghost".into(),
1668 },
1669 };
1670 let err = execute(&cli).await.unwrap_err();
1671 assert_eq!(err.to_string(), "session not found: ghost");
1672 }
1673
1674 #[tokio::test]
1675 async fn test_execute_resume_success() {
1676 let node = start_test_server().await;
1677 let cli = Cli {
1678 node,
1679 token: None,
1680 command: Commands::Resume {
1681 name: "test-session".into(),
1682 },
1683 };
1684 let result = execute(&cli).await.unwrap();
1685 assert!(result.contains("Resumed session"));
1686 assert!(result.contains("repo"));
1687 }
1688
1689 #[tokio::test]
1690 async fn test_execute_input_success() {
1691 let node = start_test_server().await;
1692 let cli = Cli {
1693 node,
1694 token: None,
1695 command: Commands::Input {
1696 name: "test-session".into(),
1697 text: Some("yes".into()),
1698 },
1699 };
1700 let result = execute(&cli).await.unwrap();
1701 assert!(result.contains("Sent input to session test-session"));
1702 }
1703
1704 #[tokio::test]
1705 async fn test_execute_input_no_text() {
1706 let node = start_test_server().await;
1707 let cli = Cli {
1708 node,
1709 token: None,
1710 command: Commands::Input {
1711 name: "test-session".into(),
1712 text: None,
1713 },
1714 };
1715 let result = execute(&cli).await.unwrap();
1716 assert!(result.contains("Sent input to session test-session"));
1717 }
1718
1719 #[tokio::test]
1720 async fn test_execute_input_connection_refused() {
1721 let cli = Cli {
1722 node: "localhost:1".into(),
1723 token: None,
1724 command: Commands::Input {
1725 name: "test".into(),
1726 text: Some("y".into()),
1727 },
1728 };
1729 let result = execute(&cli).await;
1730 let err = result.unwrap_err().to_string();
1731 assert!(err.contains("Could not connect to pulpod"));
1732 }
1733
1734 #[tokio::test]
1735 async fn test_execute_input_error_response() {
1736 use axum::{Router, http::StatusCode, routing::post};
1737
1738 let app = Router::new().route(
1739 "/api/v1/sessions/{id}/input",
1740 post(|| async {
1741 (
1742 StatusCode::NOT_FOUND,
1743 "{\"error\":\"session not found: ghost\"}",
1744 )
1745 }),
1746 );
1747 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1748 let addr = listener.local_addr().unwrap();
1749 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1750 let node = format!("127.0.0.1:{}", addr.port());
1751
1752 let cli = Cli {
1753 node,
1754 token: None,
1755 command: Commands::Input {
1756 name: "ghost".into(),
1757 text: Some("y".into()),
1758 },
1759 };
1760 let err = execute(&cli).await.unwrap_err();
1761 assert_eq!(err.to_string(), "session not found: ghost");
1762 }
1763
1764 #[tokio::test]
1765 async fn test_execute_ui() {
1766 let cli = Cli {
1767 node: "localhost:7433".into(),
1768 token: None,
1769 command: Commands::Ui,
1770 };
1771 let result = execute(&cli).await.unwrap();
1772 assert!(result.contains("Opening"));
1773 assert!(result.contains("http://localhost:7433"));
1774 }
1775
1776 #[tokio::test]
1777 async fn test_execute_ui_custom_node() {
1778 let cli = Cli {
1779 node: "mac-mini:7433".into(),
1780 token: None,
1781 command: Commands::Ui,
1782 };
1783 let result = execute(&cli).await.unwrap();
1784 assert!(result.contains("http://mac-mini:7433"));
1785 }
1786
1787 #[test]
1788 fn test_format_sessions_empty() {
1789 assert_eq!(format_sessions(&[]), "No sessions.");
1790 }
1791
1792 #[test]
1793 fn test_format_sessions_with_data() {
1794 use chrono::Utc;
1795 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1796 use uuid::Uuid;
1797
1798 let sessions = vec![Session {
1799 id: Uuid::nil(),
1800 name: "my-api".into(),
1801 workdir: "/tmp/repo".into(),
1802 provider: Provider::Claude,
1803 prompt: "Fix the bug".into(),
1804 status: SessionStatus::Running,
1805 mode: SessionMode::Interactive,
1806 conversation_id: None,
1807 exit_code: None,
1808 backend_session_id: None,
1809 output_snapshot: None,
1810 guard_config: None,
1811 model: None,
1812 allowed_tools: None,
1813 system_prompt: None,
1814 metadata: None,
1815 persona: None,
1816 max_turns: None,
1817 max_budget_usd: None,
1818 output_format: None,
1819 intervention_reason: None,
1820 intervention_at: None,
1821 last_output_at: None,
1822 idle_since: None,
1823 waiting_for_input: false,
1824 created_at: Utc::now(),
1825 updated_at: Utc::now(),
1826 }];
1827 let output = format_sessions(&sessions);
1828 assert!(output.contains("NAME"));
1829 assert!(output.contains("my-api"));
1830 assert!(output.contains("running"));
1831 assert!(output.contains("claude"));
1832 assert!(output.contains("Fix the bug"));
1833 }
1834
1835 #[test]
1836 fn test_format_sessions_long_prompt_truncated() {
1837 use chrono::Utc;
1838 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1839 use uuid::Uuid;
1840
1841 let sessions = vec![Session {
1842 id: Uuid::nil(),
1843 name: "test".into(),
1844 workdir: "/tmp".into(),
1845 provider: Provider::Codex,
1846 prompt: "A very long prompt that exceeds forty characters in total length".into(),
1847 status: SessionStatus::Completed,
1848 mode: SessionMode::Autonomous,
1849 conversation_id: None,
1850 exit_code: None,
1851 backend_session_id: None,
1852 output_snapshot: None,
1853 guard_config: None,
1854 model: None,
1855 allowed_tools: None,
1856 system_prompt: None,
1857 metadata: None,
1858 persona: None,
1859 max_turns: None,
1860 max_budget_usd: None,
1861 output_format: None,
1862 intervention_reason: None,
1863 intervention_at: None,
1864 last_output_at: None,
1865 idle_since: None,
1866 waiting_for_input: false,
1867 created_at: Utc::now(),
1868 updated_at: Utc::now(),
1869 }];
1870 let output = format_sessions(&sessions);
1871 assert!(output.contains("..."));
1872 }
1873
1874 #[test]
1875 fn test_format_sessions_waiting_for_input() {
1876 use chrono::Utc;
1877 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1878 use uuid::Uuid;
1879
1880 let sessions = vec![Session {
1881 id: Uuid::nil(),
1882 name: "blocked".into(),
1883 workdir: "/tmp".into(),
1884 provider: Provider::Claude,
1885 prompt: "Fix bug".into(),
1886 status: SessionStatus::Running,
1887 mode: SessionMode::Interactive,
1888 conversation_id: None,
1889 exit_code: None,
1890 backend_session_id: None,
1891 output_snapshot: None,
1892 guard_config: None,
1893 model: None,
1894 allowed_tools: None,
1895 system_prompt: None,
1896 metadata: None,
1897 persona: None,
1898 max_turns: None,
1899 max_budget_usd: None,
1900 output_format: None,
1901 intervention_reason: None,
1902 intervention_at: None,
1903 last_output_at: None,
1904 idle_since: None,
1905 waiting_for_input: true,
1906 created_at: Utc::now(),
1907 updated_at: Utc::now(),
1908 }];
1909 let output = format_sessions(&sessions);
1910 assert!(output.contains("waiting"));
1911 assert!(!output.contains("running"));
1912 }
1913
1914 #[test]
1915 fn test_format_nodes() {
1916 use pulpo_common::node::NodeInfo;
1917 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1918
1919 let resp = PeersResponse {
1920 local: NodeInfo {
1921 name: "mac-mini".into(),
1922 hostname: "h".into(),
1923 os: "macos".into(),
1924 arch: "arm64".into(),
1925 cpus: 8,
1926 memory_mb: 16384,
1927 gpu: None,
1928 },
1929 peers: vec![PeerInfo {
1930 name: "win-pc".into(),
1931 address: "win-pc:7433".into(),
1932 status: PeerStatus::Online,
1933 node_info: None,
1934 session_count: Some(3),
1935 source: PeerSource::Configured,
1936 }],
1937 };
1938 let output = format_nodes(&resp);
1939 assert!(output.contains("mac-mini"));
1940 assert!(output.contains("(local)"));
1941 assert!(output.contains("win-pc"));
1942 assert!(output.contains('3'));
1943 }
1944
1945 #[test]
1946 fn test_format_nodes_no_session_count() {
1947 use pulpo_common::node::NodeInfo;
1948 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1949
1950 let resp = PeersResponse {
1951 local: NodeInfo {
1952 name: "local".into(),
1953 hostname: "h".into(),
1954 os: "linux".into(),
1955 arch: "x86_64".into(),
1956 cpus: 4,
1957 memory_mb: 8192,
1958 gpu: None,
1959 },
1960 peers: vec![PeerInfo {
1961 name: "peer".into(),
1962 address: "peer:7433".into(),
1963 status: PeerStatus::Offline,
1964 node_info: None,
1965 session_count: None,
1966 source: PeerSource::Configured,
1967 }],
1968 };
1969 let output = format_nodes(&resp);
1970 assert!(output.contains("offline"));
1971 let lines: Vec<&str> = output.lines().collect();
1973 assert!(lines[2].contains('-'));
1974 }
1975
1976 #[tokio::test]
1977 async fn test_execute_resume_connection_refused() {
1978 let cli = Cli {
1979 node: "localhost:1".into(),
1980 token: None,
1981 command: Commands::Resume {
1982 name: "test".into(),
1983 },
1984 };
1985 let result = execute(&cli).await;
1986 let err = result.unwrap_err().to_string();
1987 assert!(err.contains("Could not connect to pulpod"));
1988 }
1989
1990 #[tokio::test]
1991 async fn test_execute_spawn_connection_refused() {
1992 let cli = Cli {
1993 node: "localhost:1".into(),
1994 token: None,
1995 command: Commands::Spawn {
1996 workdir: "/tmp".into(),
1997 name: None,
1998 provider: "claude".into(),
1999 auto: false,
2000 guard: "standard".into(),
2001 model: None,
2002 system_prompt: None,
2003 allowed_tools: None,
2004 persona: None,
2005 max_turns: None,
2006 max_budget: None,
2007 output_format: None,
2008 prompt: vec!["test".into()],
2009 },
2010 };
2011 let result = execute(&cli).await;
2012 let err = result.unwrap_err().to_string();
2013 assert!(err.contains("Could not connect to pulpod"));
2014 }
2015
2016 #[tokio::test]
2017 async fn test_execute_kill_connection_refused() {
2018 let cli = Cli {
2019 node: "localhost:1".into(),
2020 token: None,
2021 command: Commands::Kill {
2022 name: "test".into(),
2023 },
2024 };
2025 let result = execute(&cli).await;
2026 let err = result.unwrap_err().to_string();
2027 assert!(err.contains("Could not connect to pulpod"));
2028 }
2029
2030 #[tokio::test]
2031 async fn test_execute_delete_connection_refused() {
2032 let cli = Cli {
2033 node: "localhost:1".into(),
2034 token: None,
2035 command: Commands::Delete {
2036 name: "test".into(),
2037 },
2038 };
2039 let result = execute(&cli).await;
2040 let err = result.unwrap_err().to_string();
2041 assert!(err.contains("Could not connect to pulpod"));
2042 }
2043
2044 #[tokio::test]
2045 async fn test_execute_logs_connection_refused() {
2046 let cli = Cli {
2047 node: "localhost:1".into(),
2048 token: None,
2049 command: Commands::Logs {
2050 name: "test".into(),
2051 lines: 50,
2052 follow: false,
2053 },
2054 };
2055 let result = execute(&cli).await;
2056 let err = result.unwrap_err().to_string();
2057 assert!(err.contains("Could not connect to pulpod"));
2058 }
2059
2060 #[tokio::test]
2061 async fn test_friendly_error_connect() {
2062 let err = reqwest::Client::new()
2064 .get("http://127.0.0.1:1")
2065 .send()
2066 .await
2067 .unwrap_err();
2068 let friendly = friendly_error(&err, "test-node:1");
2069 let msg = friendly.to_string();
2070 assert!(
2071 msg.contains("Could not connect"),
2072 "Expected connect message, got: {msg}"
2073 );
2074 }
2075
2076 #[tokio::test]
2077 async fn test_friendly_error_other() {
2078 let err = reqwest::Client::new()
2080 .get("http://[::invalid::url")
2081 .send()
2082 .await
2083 .unwrap_err();
2084 let friendly = friendly_error(&err, "bad-host");
2085 let msg = friendly.to_string();
2086 assert!(
2087 msg.contains("Network error"),
2088 "Expected network error message, got: {msg}"
2089 );
2090 assert!(msg.contains("bad-host"));
2091 }
2092
2093 #[test]
2096 fn test_is_localhost_variants() {
2097 assert!(is_localhost("localhost:7433"));
2098 assert!(is_localhost("127.0.0.1:7433"));
2099 assert!(is_localhost("[::1]:7433"));
2100 assert!(is_localhost("::1"));
2101 assert!(is_localhost("localhost"));
2102 assert!(!is_localhost("mac-mini:7433"));
2103 assert!(!is_localhost("192.168.1.100:7433"));
2104 }
2105
2106 #[test]
2107 fn test_authed_get_with_token() {
2108 let client = reqwest::Client::new();
2109 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2110 .build()
2111 .unwrap();
2112 let auth = req
2113 .headers()
2114 .get("authorization")
2115 .unwrap()
2116 .to_str()
2117 .unwrap();
2118 assert_eq!(auth, "Bearer tok");
2119 }
2120
2121 #[test]
2122 fn test_authed_get_without_token() {
2123 let client = reqwest::Client::new();
2124 let req = authed_get(&client, "http://h:1/api".into(), None)
2125 .build()
2126 .unwrap();
2127 assert!(req.headers().get("authorization").is_none());
2128 }
2129
2130 #[test]
2131 fn test_authed_post_with_token() {
2132 let client = reqwest::Client::new();
2133 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2134 .build()
2135 .unwrap();
2136 let auth = req
2137 .headers()
2138 .get("authorization")
2139 .unwrap()
2140 .to_str()
2141 .unwrap();
2142 assert_eq!(auth, "Bearer secret");
2143 }
2144
2145 #[test]
2146 fn test_authed_post_without_token() {
2147 let client = reqwest::Client::new();
2148 let req = authed_post(&client, "http://h:1/api".into(), None)
2149 .build()
2150 .unwrap();
2151 assert!(req.headers().get("authorization").is_none());
2152 }
2153
2154 #[test]
2155 fn test_authed_delete_with_token() {
2156 let client = reqwest::Client::new();
2157 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2158 .build()
2159 .unwrap();
2160 let auth = req
2161 .headers()
2162 .get("authorization")
2163 .unwrap()
2164 .to_str()
2165 .unwrap();
2166 assert_eq!(auth, "Bearer del-tok");
2167 }
2168
2169 #[test]
2170 fn test_authed_delete_without_token() {
2171 let client = reqwest::Client::new();
2172 let req = authed_delete(&client, "http://h:1/api".into(), None)
2173 .build()
2174 .unwrap();
2175 assert!(req.headers().get("authorization").is_none());
2176 }
2177
2178 #[tokio::test]
2179 async fn test_resolve_token_explicit() {
2180 let client = reqwest::Client::new();
2181 let token =
2182 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2183 assert_eq!(token, Some("my-tok".into()));
2184 }
2185
2186 #[tokio::test]
2187 async fn test_resolve_token_remote_no_explicit() {
2188 let client = reqwest::Client::new();
2189 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2190 assert_eq!(token, None);
2191 }
2192
2193 #[tokio::test]
2194 async fn test_resolve_token_localhost_auto_discover() {
2195 use axum::{Json, Router, routing::get};
2196
2197 let app = Router::new().route(
2198 "/api/v1/auth/token",
2199 get(|| async {
2200 Json(AuthTokenResponse {
2201 token: "discovered".into(),
2202 })
2203 }),
2204 );
2205 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2206 let addr = listener.local_addr().unwrap();
2207 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2208
2209 let node = format!("localhost:{}", addr.port());
2210 let base = base_url(&node);
2211 let client = reqwest::Client::new();
2212 let token = resolve_token(&client, &base, &node, None).await;
2213 assert_eq!(token, Some("discovered".into()));
2214 }
2215
2216 #[tokio::test]
2217 async fn test_discover_token_empty_returns_none() {
2218 use axum::{Json, Router, routing::get};
2219
2220 let app = Router::new().route(
2221 "/api/v1/auth/token",
2222 get(|| async {
2223 Json(AuthTokenResponse {
2224 token: String::new(),
2225 })
2226 }),
2227 );
2228 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2229 let addr = listener.local_addr().unwrap();
2230 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2231
2232 let base = format!("http://127.0.0.1:{}", addr.port());
2233 let client = reqwest::Client::new();
2234 assert_eq!(discover_token(&client, &base).await, None);
2235 }
2236
2237 #[tokio::test]
2238 async fn test_discover_token_unreachable_returns_none() {
2239 let client = reqwest::Client::new();
2240 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2241 }
2242
2243 #[test]
2244 fn test_cli_parse_with_token() {
2245 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2246 assert_eq!(cli.token, Some("my-secret".into()));
2247 }
2248
2249 #[test]
2250 fn test_cli_parse_without_token() {
2251 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2252 assert_eq!(cli.token, None);
2253 }
2254
2255 #[tokio::test]
2256 async fn test_execute_with_explicit_token_sends_header() {
2257 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2258
2259 let app = Router::new().route(
2260 "/api/v1/sessions",
2261 get(|req: Request| async move {
2262 let auth = req
2263 .headers()
2264 .get("authorization")
2265 .and_then(|v| v.to_str().ok())
2266 .unwrap_or("");
2267 assert_eq!(auth, "Bearer test-token");
2268 (StatusCode::OK, "[]".to_owned())
2269 }),
2270 );
2271 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2272 let addr = listener.local_addr().unwrap();
2273 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2274 let node = format!("127.0.0.1:{}", addr.port());
2275
2276 let cli = Cli {
2277 node,
2278 token: Some("test-token".into()),
2279 command: Commands::List,
2280 };
2281 let result = execute(&cli).await.unwrap();
2282 assert_eq!(result, "No sessions.");
2283 }
2284
2285 #[test]
2288 fn test_cli_parse_interventions() {
2289 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2290 assert!(matches!(
2291 &cli.command,
2292 Commands::Interventions { name } if name == "my-session"
2293 ));
2294 }
2295
2296 #[test]
2297 fn test_format_interventions_empty() {
2298 assert_eq!(format_interventions(&[]), "No intervention events.");
2299 }
2300
2301 #[test]
2302 fn test_format_interventions_with_data() {
2303 let events = vec![
2304 InterventionEventResponse {
2305 id: 1,
2306 session_id: "sess-1".into(),
2307 reason: "Memory exceeded threshold".into(),
2308 created_at: "2026-01-01T00:00:00Z".into(),
2309 },
2310 InterventionEventResponse {
2311 id: 2,
2312 session_id: "sess-1".into(),
2313 reason: "Idle for 10 minutes".into(),
2314 created_at: "2026-01-02T00:00:00Z".into(),
2315 },
2316 ];
2317 let output = format_interventions(&events);
2318 assert!(output.contains("ID"));
2319 assert!(output.contains("TIMESTAMP"));
2320 assert!(output.contains("REASON"));
2321 assert!(output.contains("Memory exceeded threshold"));
2322 assert!(output.contains("Idle for 10 minutes"));
2323 assert!(output.contains("2026-01-01T00:00:00Z"));
2324 }
2325
2326 #[tokio::test]
2327 async fn test_execute_interventions_empty() {
2328 let node = start_test_server().await;
2329 let cli = Cli {
2330 node,
2331 token: None,
2332 command: Commands::Interventions {
2333 name: "my-session".into(),
2334 },
2335 };
2336 let result = execute(&cli).await.unwrap();
2337 assert_eq!(result, "No intervention events.");
2338 }
2339
2340 #[tokio::test]
2341 async fn test_execute_interventions_with_data() {
2342 use axum::{Router, routing::get};
2343
2344 let app = Router::new().route(
2345 "/api/v1/sessions/{id}/interventions",
2346 get(|| async {
2347 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2348 .to_owned()
2349 }),
2350 );
2351 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2352 let addr = listener.local_addr().unwrap();
2353 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2354 let node = format!("127.0.0.1:{}", addr.port());
2355
2356 let cli = Cli {
2357 node,
2358 token: None,
2359 command: Commands::Interventions {
2360 name: "test".into(),
2361 },
2362 };
2363 let result = execute(&cli).await.unwrap();
2364 assert!(result.contains("OOM"));
2365 assert!(result.contains("2026-01-01T00:00:00Z"));
2366 }
2367
2368 #[tokio::test]
2369 async fn test_execute_interventions_connection_refused() {
2370 let cli = Cli {
2371 node: "localhost:1".into(),
2372 token: None,
2373 command: Commands::Interventions {
2374 name: "test".into(),
2375 },
2376 };
2377 let result = execute(&cli).await;
2378 let err = result.unwrap_err().to_string();
2379 assert!(err.contains("Could not connect to pulpod"));
2380 }
2381
2382 #[test]
2385 fn test_build_attach_command() {
2386 let cmd = build_attach_command("my-session");
2387 assert_eq!(cmd.get_program(), "tmux");
2388 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2389 assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2390 }
2391
2392 #[test]
2393 fn test_cli_parse_attach() {
2394 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2395 assert!(matches!(
2396 &cli.command,
2397 Commands::Attach { name } if name == "my-session"
2398 ));
2399 }
2400
2401 #[test]
2402 fn test_cli_parse_attach_alias() {
2403 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2404 assert!(matches!(
2405 &cli.command,
2406 Commands::Attach { name } if name == "my-session"
2407 ));
2408 }
2409
2410 #[tokio::test]
2411 async fn test_execute_attach_success() {
2412 let cli = Cli {
2413 node: "localhost:7433".into(),
2414 token: None,
2415 command: Commands::Attach {
2416 name: "test-session".into(),
2417 },
2418 };
2419 let result = execute(&cli).await.unwrap();
2420 assert!(result.contains("Detached from session test-session"));
2421 }
2422
2423 #[test]
2426 fn test_cli_parse_alias_spawn() {
2427 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2428 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2429 }
2430
2431 #[test]
2432 fn test_cli_parse_alias_list() {
2433 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2434 assert!(matches!(cli.command, Commands::List));
2435 }
2436
2437 #[test]
2438 fn test_cli_parse_alias_logs() {
2439 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2440 assert!(matches!(
2441 &cli.command,
2442 Commands::Logs { name, .. } if name == "my-session"
2443 ));
2444 }
2445
2446 #[test]
2447 fn test_cli_parse_alias_kill() {
2448 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2449 assert!(matches!(
2450 &cli.command,
2451 Commands::Kill { name } if name == "my-session"
2452 ));
2453 }
2454
2455 #[test]
2456 fn test_cli_parse_alias_delete() {
2457 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2458 assert!(matches!(
2459 &cli.command,
2460 Commands::Delete { name } if name == "my-session"
2461 ));
2462 }
2463
2464 #[test]
2465 fn test_cli_parse_alias_resume() {
2466 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2467 assert!(matches!(
2468 &cli.command,
2469 Commands::Resume { name } if name == "my-session"
2470 ));
2471 }
2472
2473 #[test]
2474 fn test_cli_parse_alias_nodes() {
2475 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2476 assert!(matches!(cli.command, Commands::Nodes));
2477 }
2478
2479 #[test]
2480 fn test_cli_parse_alias_interventions() {
2481 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2482 assert!(matches!(
2483 &cli.command,
2484 Commands::Interventions { name } if name == "my-session"
2485 ));
2486 }
2487
2488 #[test]
2489 fn test_api_error_json() {
2490 let err = api_error("{\"error\":\"session not found: foo\"}");
2491 assert_eq!(err.to_string(), "session not found: foo");
2492 }
2493
2494 #[test]
2495 fn test_api_error_plain_text() {
2496 let err = api_error("plain text error");
2497 assert_eq!(err.to_string(), "plain text error");
2498 }
2499
2500 #[test]
2503 fn test_diff_output_empty_prev() {
2504 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2505 }
2506
2507 #[test]
2508 fn test_diff_output_identical() {
2509 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2510 }
2511
2512 #[test]
2513 fn test_diff_output_new_lines_appended() {
2514 let prev = "line1\nline2";
2515 let new = "line1\nline2\nline3\nline4";
2516 assert_eq!(diff_output(prev, new), "line3\nline4");
2517 }
2518
2519 #[test]
2520 fn test_diff_output_scrolled_window() {
2521 let prev = "line1\nline2\nline3";
2523 let new = "line2\nline3\nline4";
2524 assert_eq!(diff_output(prev, new), "line4");
2525 }
2526
2527 #[test]
2528 fn test_diff_output_completely_different() {
2529 let prev = "aaa\nbbb";
2530 let new = "xxx\nyyy";
2531 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2532 }
2533
2534 #[test]
2535 fn test_diff_output_last_line_matches_but_overlap_fails() {
2536 let prev = "aaa\ncommon";
2538 let new = "zzz\ncommon\nnew_line";
2539 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2543 }
2544
2545 #[test]
2546 fn test_diff_output_new_empty() {
2547 assert_eq!(diff_output("line1", ""), "");
2548 }
2549
2550 async fn start_follow_test_server() -> String {
2554 use axum::{Router, extract::Path, extract::Query, routing::get};
2555 use std::sync::Arc;
2556 use std::sync::atomic::{AtomicUsize, Ordering};
2557
2558 let call_count = Arc::new(AtomicUsize::new(0));
2559 let output_count = call_count.clone();
2560 let status_count = Arc::new(AtomicUsize::new(0));
2561 let status_count_inner = status_count.clone();
2562
2563 let app = Router::new()
2564 .route(
2565 "/api/v1/sessions/{id}/output",
2566 get(
2567 move |_path: Path<String>,
2568 _query: Query<std::collections::HashMap<String, String>>| {
2569 let count = output_count.clone();
2570 async move {
2571 let n = count.fetch_add(1, Ordering::SeqCst);
2572 let output = match n {
2573 0 => "line1\nline2".to_owned(),
2574 1 => "line1\nline2\nline3".to_owned(),
2575 _ => "line2\nline3\nline4".to_owned(),
2576 };
2577 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2578 }
2579 },
2580 ),
2581 )
2582 .route(
2583 "/api/v1/sessions/{id}",
2584 get(move |_path: Path<String>| {
2585 let count = status_count_inner.clone();
2586 async move {
2587 let n = count.fetch_add(1, Ordering::SeqCst);
2588 let status = if n < 2 { "running" } else { "completed" };
2589 format!(
2590 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2591 )
2592 }
2593 }),
2594 );
2595
2596 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2597 let addr = listener.local_addr().unwrap();
2598 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2599 format!("http://127.0.0.1:{}", addr.port())
2600 }
2601
2602 #[tokio::test]
2603 async fn test_follow_logs_polls_and_exits_on_completed() {
2604 let base = start_follow_test_server().await;
2605 let client = reqwest::Client::new();
2606 let mut buf = Vec::new();
2607
2608 follow_logs(&client, &base, "test", 100, None, &mut buf)
2609 .await
2610 .unwrap();
2611
2612 let output = String::from_utf8(buf).unwrap();
2613 assert!(output.contains("line1"));
2615 assert!(output.contains("line2"));
2616 assert!(output.contains("line3"));
2617 assert!(output.contains("line4"));
2618 }
2619
2620 #[tokio::test]
2621 async fn test_execute_logs_follow_success() {
2622 let base = start_follow_test_server().await;
2623 let node = base.strip_prefix("http://").unwrap().to_owned();
2625
2626 let cli = Cli {
2627 node,
2628 token: None,
2629 command: Commands::Logs {
2630 name: "test".into(),
2631 lines: 100,
2632 follow: true,
2633 },
2634 };
2635 let result = execute(&cli).await.unwrap();
2637 assert_eq!(result, "");
2638 }
2639
2640 #[tokio::test]
2641 async fn test_execute_logs_follow_connection_refused() {
2642 let cli = Cli {
2643 node: "localhost:1".into(),
2644 token: None,
2645 command: Commands::Logs {
2646 name: "test".into(),
2647 lines: 50,
2648 follow: true,
2649 },
2650 };
2651 let result = execute(&cli).await;
2652 let err = result.unwrap_err().to_string();
2653 assert!(
2654 err.contains("Could not connect to pulpod"),
2655 "Expected friendly error, got: {err}"
2656 );
2657 }
2658
2659 #[tokio::test]
2660 async fn test_follow_logs_exits_on_dead() {
2661 use axum::{Router, extract::Path, extract::Query, routing::get};
2662
2663 let app = Router::new()
2664 .route(
2665 "/api/v1/sessions/{id}/output",
2666 get(
2667 |_path: Path<String>,
2668 _query: Query<std::collections::HashMap<String, String>>| async {
2669 r#"{"output":"some output"}"#.to_owned()
2670 },
2671 ),
2672 )
2673 .route(
2674 "/api/v1/sessions/{id}",
2675 get(|_path: Path<String>| async {
2676 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2677 }),
2678 );
2679
2680 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2681 let addr = listener.local_addr().unwrap();
2682 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2683 let base = format!("http://127.0.0.1:{}", addr.port());
2684
2685 let client = reqwest::Client::new();
2686 let mut buf = Vec::new();
2687 follow_logs(&client, &base, "test", 100, None, &mut buf)
2688 .await
2689 .unwrap();
2690
2691 let output = String::from_utf8(buf).unwrap();
2692 assert!(output.contains("some output"));
2693 }
2694
2695 #[tokio::test]
2696 async fn test_follow_logs_exits_on_stale() {
2697 use axum::{Router, extract::Path, extract::Query, routing::get};
2698
2699 let app = Router::new()
2700 .route(
2701 "/api/v1/sessions/{id}/output",
2702 get(
2703 |_path: Path<String>,
2704 _query: Query<std::collections::HashMap<String, String>>| async {
2705 r#"{"output":"stale output"}"#.to_owned()
2706 },
2707 ),
2708 )
2709 .route(
2710 "/api/v1/sessions/{id}",
2711 get(|_path: Path<String>| async {
2712 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2713 }),
2714 );
2715
2716 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2717 let addr = listener.local_addr().unwrap();
2718 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2719 let base = format!("http://127.0.0.1:{}", addr.port());
2720
2721 let client = reqwest::Client::new();
2722 let mut buf = Vec::new();
2723 follow_logs(&client, &base, "test", 100, None, &mut buf)
2724 .await
2725 .unwrap();
2726
2727 let output = String::from_utf8(buf).unwrap();
2728 assert!(output.contains("stale output"));
2729 }
2730
2731 #[tokio::test]
2732 async fn test_execute_logs_follow_non_reqwest_error() {
2733 use axum::{Router, extract::Path, extract::Query, routing::get};
2734
2735 let app = Router::new()
2737 .route(
2738 "/api/v1/sessions/{id}/output",
2739 get(
2740 |_path: Path<String>,
2741 _query: Query<std::collections::HashMap<String, String>>| async {
2742 r#"{"output":"initial"}"#.to_owned()
2743 },
2744 ),
2745 )
2746 .route(
2747 "/api/v1/sessions/{id}",
2748 get(|_path: Path<String>| async { "not valid json".to_owned() }),
2749 );
2750
2751 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2752 let addr = listener.local_addr().unwrap();
2753 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2754 let node = format!("127.0.0.1:{}", addr.port());
2755
2756 let cli = Cli {
2757 node,
2758 token: None,
2759 command: Commands::Logs {
2760 name: "test".into(),
2761 lines: 100,
2762 follow: true,
2763 },
2764 };
2765 let err = execute(&cli).await.unwrap_err();
2766 let msg = err.to_string();
2768 assert!(
2769 msg.contains("expected ident"),
2770 "Expected serde parse error, got: {msg}"
2771 );
2772 }
2773
2774 #[test]
2775 fn test_cli_parse_spawn_with_guardrails() {
2776 let cli = Cli::try_parse_from([
2777 "pulpo",
2778 "spawn",
2779 "--workdir",
2780 "/tmp",
2781 "--max-turns",
2782 "10",
2783 "--max-budget",
2784 "5.5",
2785 "--output-format",
2786 "json",
2787 "Do it",
2788 ])
2789 .unwrap();
2790 assert!(matches!(
2791 &cli.command,
2792 Commands::Spawn { max_turns, max_budget, output_format, .. }
2793 if *max_turns == Some(10) && *max_budget == Some(5.5)
2794 && output_format.as_deref() == Some("json")
2795 ));
2796 }
2797
2798 #[tokio::test]
2799 async fn test_fetch_session_status_connection_error() {
2800 let client = reqwest::Client::new();
2801 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
2802 assert!(result.is_err());
2803 }
2804
2805 #[test]
2808 fn test_build_crontab_line() {
2809 let line = build_crontab_line(
2810 "nightly-review",
2811 "0 3 * * *",
2812 "/home/me/repo",
2813 "claude",
2814 "Review PRs",
2815 "localhost:7433",
2816 );
2817 assert_eq!(
2818 line,
2819 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
2820 );
2821 }
2822
2823 #[test]
2824 fn test_crontab_install_success() {
2825 let crontab = "# existing cron\n0 * * * * echo hi\n";
2826 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
2827 let result = crontab_install(crontab, "my-job", line).unwrap();
2828 assert!(result.starts_with("# existing cron\n"));
2829 assert!(result.ends_with("#pulpo:my-job\n"));
2830 assert!(result.contains("echo hi"));
2831 }
2832
2833 #[test]
2834 fn test_crontab_install_duplicate_error() {
2835 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2836 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
2837 let err = crontab_install(crontab, "my-job", line).unwrap_err();
2838 assert!(err.to_string().contains("already exists"));
2839 }
2840
2841 #[test]
2842 fn test_crontab_list_empty() {
2843 assert_eq!(crontab_list(""), "No pulpo schedules.");
2844 }
2845
2846 #[test]
2847 fn test_crontab_list_no_pulpo_entries() {
2848 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
2849 }
2850
2851 #[test]
2852 fn test_crontab_list_with_entries() {
2853 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
2854 let output = crontab_list(crontab);
2855 assert!(output.contains("NAME"));
2856 assert!(output.contains("CRON"));
2857 assert!(output.contains("PAUSED"));
2858 assert!(output.contains("nightly"));
2859 assert!(output.contains("0 3 * * *"));
2860 assert!(output.contains("no"));
2861 }
2862
2863 #[test]
2864 fn test_crontab_list_paused_entry() {
2865 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
2866 let output = crontab_list(crontab);
2867 assert!(output.contains("paused-job"));
2868 assert!(output.contains("yes"));
2869 }
2870
2871 #[test]
2872 fn test_crontab_list_short_line() {
2873 let crontab = "badcron #pulpo:broken\n";
2875 let output = crontab_list(crontab);
2876 assert!(output.contains("broken"));
2877 assert!(output.contains('?'));
2878 }
2879
2880 #[test]
2881 fn test_crontab_remove_success() {
2882 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
2883 let result = crontab_remove(crontab, "my-job").unwrap();
2884 assert!(result.contains("echo hi"));
2885 assert!(!result.contains("my-job"));
2886 }
2887
2888 #[test]
2889 fn test_crontab_remove_not_found() {
2890 let crontab = "0 * * * * echo hi\n";
2891 let err = crontab_remove(crontab, "ghost").unwrap_err();
2892 assert!(err.to_string().contains("not found"));
2893 }
2894
2895 #[test]
2896 fn test_crontab_pause_success() {
2897 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2898 let result = crontab_pause(crontab, "my-job").unwrap();
2899 assert!(result.starts_with('#'));
2900 assert!(result.contains("#pulpo:my-job"));
2901 }
2902
2903 #[test]
2904 fn test_crontab_pause_not_found() {
2905 let crontab = "0 * * * * echo hi\n";
2906 let err = crontab_pause(crontab, "ghost").unwrap_err();
2907 assert!(err.to_string().contains("not found or already paused"));
2908 }
2909
2910 #[test]
2911 fn test_crontab_pause_already_paused() {
2912 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2913 let err = crontab_pause(crontab, "my-job").unwrap_err();
2914 assert!(err.to_string().contains("already paused"));
2915 }
2916
2917 #[test]
2918 fn test_crontab_resume_success() {
2919 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2920 let result = crontab_resume(crontab, "my-job").unwrap();
2921 assert!(!result.starts_with('#'));
2922 assert!(result.contains("#pulpo:my-job"));
2923 }
2924
2925 #[test]
2926 fn test_crontab_resume_not_found() {
2927 let crontab = "0 * * * * echo hi\n";
2928 let err = crontab_resume(crontab, "ghost").unwrap_err();
2929 assert!(err.to_string().contains("not found or not paused"));
2930 }
2931
2932 #[test]
2933 fn test_crontab_resume_not_paused() {
2934 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2935 let err = crontab_resume(crontab, "my-job").unwrap_err();
2936 assert!(err.to_string().contains("not paused"));
2937 }
2938
2939 #[test]
2942 fn test_cli_parse_schedule_install() {
2943 let cli = Cli::try_parse_from([
2944 "pulpo",
2945 "schedule",
2946 "install",
2947 "nightly",
2948 "0 3 * * *",
2949 "--workdir",
2950 "/tmp/repo",
2951 "Review",
2952 "PRs",
2953 ])
2954 .unwrap();
2955 assert!(matches!(
2956 &cli.command,
2957 Commands::Schedule {
2958 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
2959 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
2960 && provider == "claude" && prompt == &["Review", "PRs"]
2961 ));
2962 }
2963
2964 #[test]
2965 fn test_cli_parse_schedule_list() {
2966 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
2967 assert!(matches!(
2968 &cli.command,
2969 Commands::Schedule {
2970 action: ScheduleAction::List
2971 }
2972 ));
2973 }
2974
2975 #[test]
2976 fn test_cli_parse_schedule_remove() {
2977 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
2978 assert!(matches!(
2979 &cli.command,
2980 Commands::Schedule {
2981 action: ScheduleAction::Remove { name }
2982 } if name == "nightly"
2983 ));
2984 }
2985
2986 #[test]
2987 fn test_cli_parse_schedule_pause() {
2988 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
2989 assert!(matches!(
2990 &cli.command,
2991 Commands::Schedule {
2992 action: ScheduleAction::Pause { name }
2993 } if name == "nightly"
2994 ));
2995 }
2996
2997 #[test]
2998 fn test_cli_parse_schedule_resume() {
2999 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3000 assert!(matches!(
3001 &cli.command,
3002 Commands::Schedule {
3003 action: ScheduleAction::Resume { name }
3004 } if name == "nightly"
3005 ));
3006 }
3007
3008 #[test]
3009 fn test_cli_parse_schedule_alias() {
3010 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3011 assert!(matches!(
3012 &cli.command,
3013 Commands::Schedule {
3014 action: ScheduleAction::List
3015 }
3016 ));
3017 }
3018
3019 #[test]
3020 fn test_cli_parse_schedule_list_alias() {
3021 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3022 assert!(matches!(
3023 &cli.command,
3024 Commands::Schedule {
3025 action: ScheduleAction::List
3026 }
3027 ));
3028 }
3029
3030 #[test]
3031 fn test_cli_parse_schedule_remove_alias() {
3032 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3033 assert!(matches!(
3034 &cli.command,
3035 Commands::Schedule {
3036 action: ScheduleAction::Remove { name }
3037 } if name == "nightly"
3038 ));
3039 }
3040
3041 #[test]
3042 fn test_cli_parse_schedule_install_custom_provider() {
3043 let cli = Cli::try_parse_from([
3044 "pulpo",
3045 "schedule",
3046 "install",
3047 "daily",
3048 "0 9 * * *",
3049 "--workdir",
3050 "/tmp",
3051 "--provider",
3052 "codex",
3053 "Run tests",
3054 ])
3055 .unwrap();
3056 assert!(matches!(
3057 &cli.command,
3058 Commands::Schedule {
3059 action: ScheduleAction::Install { provider, .. }
3060 } if provider == "codex"
3061 ));
3062 }
3063
3064 #[tokio::test]
3065 async fn test_execute_schedule_via_execute() {
3066 let node = start_test_server().await;
3068 let cli = Cli {
3069 node,
3070 token: None,
3071 command: Commands::Schedule {
3072 action: ScheduleAction::List,
3073 },
3074 };
3075 let result = execute(&cli).await;
3076 assert!(result.is_ok() || result.is_err());
3078 }
3079
3080 #[test]
3081 fn test_schedule_action_debug() {
3082 let action = ScheduleAction::List;
3083 assert_eq!(format!("{action:?}"), "List");
3084 }
3085}