1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{AuthTokenResponse, InterventionEventResponse, PeersResponse};
4use pulpo_common::session::Session;
5
6#[derive(Parser, Debug)]
7#[command(
8 name = "pulpo",
9 about = "Manage agent sessions across your machines",
10 version
11)]
12pub struct Cli {
13 #[arg(long, default_value = "localhost:7433")]
15 pub node: String,
16
17 #[arg(long)]
19 pub token: Option<String>,
20
21 #[command(subcommand)]
22 pub command: Commands,
23}
24
25#[derive(Subcommand, Debug)]
26#[allow(clippy::large_enum_variant)]
27pub enum Commands {
28 #[command(visible_alias = "a")]
30 Attach {
31 name: String,
33 },
34
35 #[command(visible_alias = "i")]
37 Input {
38 name: String,
40 text: Option<String>,
42 },
43
44 #[command(visible_alias = "s")]
46 Spawn {
47 #[arg(long)]
49 workdir: String,
50
51 #[arg(long)]
53 name: Option<String>,
54
55 #[arg(long, default_value = "claude")]
57 provider: String,
58
59 #[arg(long)]
61 auto: bool,
62
63 #[arg(long, default_value = "standard")]
65 guard: String,
66
67 #[arg(long)]
69 model: Option<String>,
70
71 #[arg(long)]
73 system_prompt: Option<String>,
74
75 #[arg(long, value_delimiter = ',')]
77 allowed_tools: Option<Vec<String>>,
78
79 #[arg(long)]
81 ink: Option<String>,
82
83 #[arg(long)]
85 max_turns: Option<u32>,
86
87 #[arg(long)]
89 max_budget: Option<f64>,
90
91 #[arg(long)]
93 output_format: Option<String>,
94
95 prompt: Vec<String>,
97 },
98
99 #[command(visible_alias = "ls")]
101 List,
102
103 #[command(visible_alias = "l")]
105 Logs {
106 name: String,
108
109 #[arg(long, default_value = "100")]
111 lines: usize,
112
113 #[arg(short, long)]
115 follow: bool,
116 },
117
118 #[command(visible_alias = "k")]
120 Kill {
121 name: String,
123 },
124
125 #[command(visible_alias = "rm")]
127 Delete {
128 name: String,
130 },
131
132 #[command(visible_alias = "r")]
134 Resume {
135 name: String,
137 },
138
139 #[command(visible_alias = "n")]
141 Nodes,
142
143 #[command(visible_alias = "iv")]
145 Interventions {
146 name: String,
148 },
149
150 Ui,
152
153 #[command(visible_alias = "sched")]
155 Schedule {
156 #[command(subcommand)]
157 action: ScheduleAction,
158 },
159}
160
161#[derive(Subcommand, Debug)]
162pub enum ScheduleAction {
163 Install {
165 name: String,
167 cron: String,
169 #[arg(long)]
171 workdir: String,
172 #[arg(long, default_value = "claude")]
174 provider: String,
175 prompt: Vec<String>,
177 },
178 #[command(alias = "ls")]
180 List,
181 #[command(alias = "rm")]
183 Remove {
184 name: String,
186 },
187 Pause {
189 name: String,
191 },
192 Resume {
194 name: String,
196 },
197}
198
199pub fn base_url(node: &str) -> String {
201 format!("http://{node}")
202}
203
204#[derive(serde::Deserialize)]
206struct OutputResponse {
207 output: String,
208}
209
210fn format_sessions(sessions: &[Session]) -> String {
212 if sessions.is_empty() {
213 return "No sessions.".into();
214 }
215 let mut lines = vec![format!(
216 "{:<20} {:<12} {:<10} {:<14} {}",
217 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
218 )];
219 for s in sessions {
220 let prompt_display = if s.prompt.len() > 40 {
221 format!("{}...", &s.prompt[..37])
222 } else {
223 s.prompt.clone()
224 };
225 let status_display = if s.waiting_for_input {
226 "waiting".to_owned()
227 } else {
228 s.status.to_string()
229 };
230 lines.push(format!(
231 "{:<20} {:<12} {:<10} {:<14} {}",
232 s.name, status_display, s.provider, s.mode, prompt_display
233 ));
234 }
235 lines.join("\n")
236}
237
238fn format_nodes(resp: &PeersResponse) -> String {
240 let mut lines = vec![format!(
241 "{:<20} {:<25} {:<10} {}",
242 "NAME", "ADDRESS", "STATUS", "SESSIONS"
243 )];
244 lines.push(format!(
245 "{:<20} {:<25} {:<10} {}",
246 resp.local.name, "(local)", "online", "-"
247 ));
248 for p in &resp.peers {
249 let sessions = p
250 .session_count
251 .map_or_else(|| "-".into(), |c| c.to_string());
252 lines.push(format!(
253 "{:<20} {:<25} {:<10} {}",
254 p.name, p.address, p.status, sessions
255 ));
256 }
257 lines.join("\n")
258}
259
260fn format_interventions(events: &[InterventionEventResponse]) -> String {
262 if events.is_empty() {
263 return "No intervention events.".into();
264 }
265 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
266 for e in events {
267 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
268 }
269 lines.join("\n")
270}
271
272#[cfg_attr(coverage, allow(dead_code))]
274fn build_open_command(url: &str) -> std::process::Command {
275 #[cfg(target_os = "macos")]
276 {
277 let mut cmd = std::process::Command::new("open");
278 cmd.arg(url);
279 cmd
280 }
281 #[cfg(target_os = "linux")]
282 {
283 let mut cmd = std::process::Command::new("xdg-open");
284 cmd.arg(url);
285 cmd
286 }
287 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
288 {
289 let mut cmd = std::process::Command::new("xdg-open");
291 cmd.arg(url);
292 cmd
293 }
294}
295
296#[cfg(not(coverage))]
298fn open_browser(url: &str) -> Result<()> {
299 build_open_command(url).status()?;
300 Ok(())
301}
302
303#[cfg(coverage)]
305fn open_browser(_url: &str) -> Result<()> {
306 Ok(())
307}
308
309#[cfg_attr(coverage, allow(dead_code))]
312fn build_attach_command(backend_session_id: &str) -> std::process::Command {
313 let mut cmd = std::process::Command::new("tmux");
314 cmd.args(["attach-session", "-t", backend_session_id]);
315 cmd
316}
317
318#[cfg(not(any(test, coverage)))]
320fn attach_session(backend_session_id: &str) -> Result<()> {
321 let status = build_attach_command(backend_session_id).status()?;
322 if !status.success() {
323 anyhow::bail!("attach failed with {status}");
324 }
325 Ok(())
326}
327
328#[cfg(any(test, coverage))]
330#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
331fn attach_session(_backend_session_id: &str) -> Result<()> {
332 Ok(())
333}
334
335fn api_error(text: &str) -> anyhow::Error {
337 serde_json::from_str::<serde_json::Value>(text)
338 .ok()
339 .and_then(|v| v["error"].as_str().map(String::from))
340 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
341}
342
343async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
345 if resp.status().is_success() {
346 Ok(resp.text().await?)
347 } else {
348 let text = resp.text().await?;
349 Err(api_error(&text))
350 }
351}
352
353fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
355 if err.is_connect() {
356 anyhow::anyhow!(
357 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
358 )
359 } else {
360 anyhow::anyhow!("Network error connecting to {node}: {err}")
361 }
362}
363
364fn is_localhost(node: &str) -> bool {
366 let host = node.split(':').next().unwrap_or(node);
367 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
368}
369
370async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
372 let resp = client
373 .get(format!("{base}/api/v1/auth/token"))
374 .send()
375 .await
376 .ok()?;
377 let body: AuthTokenResponse = resp.json().await.ok()?;
378 if body.token.is_empty() {
379 None
380 } else {
381 Some(body.token)
382 }
383}
384
385async fn resolve_token(
387 client: &reqwest::Client,
388 base: &str,
389 node: &str,
390 explicit: Option<&str>,
391) -> Option<String> {
392 if let Some(t) = explicit {
393 return Some(t.to_owned());
394 }
395 if is_localhost(node) {
396 return discover_token(client, base).await;
397 }
398 None
399}
400
401fn authed_get(
403 client: &reqwest::Client,
404 url: String,
405 token: Option<&str>,
406) -> reqwest::RequestBuilder {
407 let req = client.get(url);
408 if let Some(t) = token {
409 req.bearer_auth(t)
410 } else {
411 req
412 }
413}
414
415fn authed_post(
417 client: &reqwest::Client,
418 url: String,
419 token: Option<&str>,
420) -> reqwest::RequestBuilder {
421 let req = client.post(url);
422 if let Some(t) = token {
423 req.bearer_auth(t)
424 } else {
425 req
426 }
427}
428
429fn authed_delete(
431 client: &reqwest::Client,
432 url: String,
433 token: Option<&str>,
434) -> reqwest::RequestBuilder {
435 let req = client.delete(url);
436 if let Some(t) = token {
437 req.bearer_auth(t)
438 } else {
439 req
440 }
441}
442
443async fn fetch_output(
445 client: &reqwest::Client,
446 base: &str,
447 name: &str,
448 lines: usize,
449 token: Option<&str>,
450) -> Result<String> {
451 let resp = authed_get(
452 client,
453 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
454 token,
455 )
456 .send()
457 .await?;
458 let text = ok_or_api_error(resp).await?;
459 let output: OutputResponse = serde_json::from_str(&text)?;
460 Ok(output.output)
461}
462
463async fn fetch_session_status(
465 client: &reqwest::Client,
466 base: &str,
467 name: &str,
468 token: Option<&str>,
469) -> Result<String> {
470 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
471 .send()
472 .await?;
473 let text = ok_or_api_error(resp).await?;
474 let session: Session = serde_json::from_str(&text)?;
475 Ok(session.status.to_string())
476}
477
478fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
485 if prev.is_empty() {
486 return new;
487 }
488
489 let prev_lines: Vec<&str> = prev.lines().collect();
490 let new_lines: Vec<&str> = new.lines().collect();
491
492 if new_lines.is_empty() {
493 return "";
494 }
495
496 let last_prev = prev_lines[prev_lines.len() - 1];
498
499 for i in (0..new_lines.len()).rev() {
501 if new_lines[i] == last_prev {
502 let overlap_len = prev_lines.len().min(i + 1);
504 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
505 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
506 if prev_tail == new_overlap {
507 if i + 1 < new_lines.len() {
508 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
510 return new.get(consumed.min(new.len())..).unwrap_or("");
511 }
512 return "";
513 }
514 }
515 }
516
517 new
519}
520
521async fn follow_logs(
523 client: &reqwest::Client,
524 base: &str,
525 name: &str,
526 lines: usize,
527 token: Option<&str>,
528 writer: &mut (dyn std::io::Write + Send),
529) -> Result<()> {
530 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
531 write!(writer, "{prev_output}")?;
532
533 loop {
534 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
535
536 let status = fetch_session_status(client, base, name, token).await?;
538 let is_terminal = status == "completed" || status == "dead" || status == "stale";
539
540 let new_output = fetch_output(client, base, name, lines, token).await?;
542
543 let diff = diff_output(&prev_output, &new_output);
544 if !diff.is_empty() {
545 write!(writer, "{diff}")?;
546 }
547 prev_output = new_output;
548
549 if is_terminal {
550 break;
551 }
552 }
553 Ok(())
554}
555
556#[cfg_attr(coverage, allow(dead_code))]
559const CRONTAB_TAG: &str = "#pulpo:";
560
561#[cfg(not(coverage))]
563fn read_crontab() -> Result<String> {
564 let output = std::process::Command::new("crontab").arg("-l").output()?;
565 if output.status.success() {
566 Ok(String::from_utf8_lossy(&output.stdout).to_string())
567 } else {
568 Ok(String::new())
569 }
570}
571
572#[cfg(not(coverage))]
574fn write_crontab(content: &str) -> Result<()> {
575 use std::io::Write;
576 let mut child = std::process::Command::new("crontab")
577 .arg("-")
578 .stdin(std::process::Stdio::piped())
579 .spawn()?;
580 child
581 .stdin
582 .as_mut()
583 .unwrap()
584 .write_all(content.as_bytes())?;
585 let status = child.wait()?;
586 if !status.success() {
587 anyhow::bail!("crontab write failed");
588 }
589 Ok(())
590}
591
592#[cfg_attr(coverage, allow(dead_code))]
594fn build_crontab_line(
595 name: &str,
596 cron: &str,
597 workdir: &str,
598 provider: &str,
599 prompt: &str,
600 node: &str,
601) -> String {
602 format!(
603 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
604 )
605}
606
607#[cfg_attr(coverage, allow(dead_code))]
609fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
610 let tag = format!("{CRONTAB_TAG}{name}");
611 if crontab.contains(&tag) {
612 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
613 }
614 let mut result = crontab.to_owned();
615 result.push_str(line);
616 Ok(result)
617}
618
619#[cfg_attr(coverage, allow(dead_code))]
621fn crontab_list(crontab: &str) -> String {
622 let entries: Vec<&str> = crontab
623 .lines()
624 .filter(|l| l.contains(CRONTAB_TAG))
625 .collect();
626 if entries.is_empty() {
627 return "No pulpo schedules.".into();
628 }
629 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
630 for entry in entries {
631 let paused = entry.starts_with('#');
632 let raw = entry.trim_start_matches('#').trim();
633 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
634 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
635 let cron_expr = if parts.len() >= 5 {
636 parts[..5].join(" ")
637 } else {
638 "?".into()
639 };
640 lines.push(format!(
641 "{:<20} {:<15} {}",
642 name,
643 cron_expr,
644 if paused { "yes" } else { "no" }
645 ));
646 }
647 lines.join("\n")
648}
649
650#[cfg_attr(coverage, allow(dead_code))]
652fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
653 use std::fmt::Write;
654 let tag = format!("{CRONTAB_TAG}{name}");
655 let filtered =
656 crontab
657 .lines()
658 .filter(|l| !l.contains(&tag))
659 .fold(String::new(), |mut acc, l| {
660 writeln!(acc, "{l}").unwrap();
661 acc
662 });
663 if filtered.len() == crontab.len() {
664 anyhow::bail!("schedule \"{name}\" not found");
665 }
666 Ok(filtered)
667}
668
669#[cfg_attr(coverage, allow(dead_code))]
671fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
672 use std::fmt::Write;
673 let tag = format!("{CRONTAB_TAG}{name}");
674 let mut found = false;
675 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
676 if l.contains(&tag) && !l.starts_with('#') {
677 found = true;
678 writeln!(acc, "#{l}").unwrap();
679 } else {
680 writeln!(acc, "{l}").unwrap();
681 }
682 acc
683 });
684 if !found {
685 anyhow::bail!("schedule \"{name}\" not found or already paused");
686 }
687 Ok(updated)
688}
689
690#[cfg_attr(coverage, allow(dead_code))]
692fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
693 use std::fmt::Write;
694 let tag = format!("{CRONTAB_TAG}{name}");
695 let mut found = false;
696 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
697 if l.contains(&tag) && l.starts_with('#') {
698 found = true;
699 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
700 } else {
701 writeln!(acc, "{l}").unwrap();
702 }
703 acc
704 });
705 if !found {
706 anyhow::bail!("schedule \"{name}\" not found or not paused");
707 }
708 Ok(updated)
709}
710
711#[cfg(not(coverage))]
713fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
714 match action {
715 ScheduleAction::Install {
716 name,
717 cron,
718 workdir,
719 provider,
720 prompt,
721 } => {
722 let crontab = read_crontab()?;
723 let joined_prompt = prompt.join(" ");
724 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
725 let updated = crontab_install(&crontab, name, &line)?;
726 write_crontab(&updated)?;
727 Ok(format!("Installed schedule \"{name}\""))
728 }
729 ScheduleAction::List => {
730 let crontab = read_crontab()?;
731 Ok(crontab_list(&crontab))
732 }
733 ScheduleAction::Remove { name } => {
734 let crontab = read_crontab()?;
735 let updated = crontab_remove(&crontab, name)?;
736 write_crontab(&updated)?;
737 Ok(format!("Removed schedule \"{name}\""))
738 }
739 ScheduleAction::Pause { name } => {
740 let crontab = read_crontab()?;
741 let updated = crontab_pause(&crontab, name)?;
742 write_crontab(&updated)?;
743 Ok(format!("Paused schedule \"{name}\""))
744 }
745 ScheduleAction::Resume { name } => {
746 let crontab = read_crontab()?;
747 let updated = crontab_resume(&crontab, name)?;
748 write_crontab(&updated)?;
749 Ok(format!("Resumed schedule \"{name}\""))
750 }
751 }
752}
753
754#[cfg(coverage)]
756fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
757 Ok(String::new())
758}
759
760#[allow(clippy::too_many_lines)]
762pub async fn execute(cli: &Cli) -> Result<String> {
763 let url = base_url(&cli.node);
764 let client = reqwest::Client::new();
765 let node = &cli.node;
766 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
767
768 match &cli.command {
769 Commands::Attach { name } => {
770 let resp = authed_get(
772 &client,
773 format!("{url}/api/v1/sessions/{name}"),
774 token.as_deref(),
775 )
776 .send()
777 .await
778 .map_err(|e| friendly_error(&e, node))?;
779 let text = ok_or_api_error(resp).await?;
780 let session: Session = serde_json::from_str(&text)?;
781 let backend_id = session
782 .backend_session_id
783 .unwrap_or_else(|| format!("pulpo-{name}"));
784 attach_session(&backend_id)?;
785 Ok(format!("Detached from session {name}."))
786 }
787 Commands::Input { name, text } => {
788 let input_text = text.as_deref().unwrap_or("\n");
789 let body = serde_json::json!({ "text": input_text });
790 let resp = authed_post(
791 &client,
792 format!("{url}/api/v1/sessions/{name}/input"),
793 token.as_deref(),
794 )
795 .json(&body)
796 .send()
797 .await
798 .map_err(|e| friendly_error(&e, node))?;
799 ok_or_api_error(resp).await?;
800 Ok(format!("Sent input to session {name}."))
801 }
802 Commands::List => {
803 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
804 .send()
805 .await
806 .map_err(|e| friendly_error(&e, node))?;
807 let text = ok_or_api_error(resp).await?;
808 let sessions: Vec<Session> = serde_json::from_str(&text)?;
809 Ok(format_sessions(&sessions))
810 }
811 Commands::Nodes => {
812 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
813 .send()
814 .await
815 .map_err(|e| friendly_error(&e, node))?;
816 let text = ok_or_api_error(resp).await?;
817 let resp: PeersResponse = serde_json::from_str(&text)?;
818 Ok(format_nodes(&resp))
819 }
820 Commands::Spawn {
821 workdir,
822 name,
823 provider,
824 auto,
825 guard,
826 model,
827 system_prompt,
828 allowed_tools,
829 ink,
830 max_turns,
831 max_budget,
832 output_format,
833 prompt,
834 } => {
835 let prompt_text = prompt.join(" ");
836 let mode = if *auto { "autonomous" } else { "interactive" };
837 let mut body = serde_json::json!({
838 "workdir": workdir,
839 "provider": provider,
840 "prompt": prompt_text,
841 "mode": mode,
842 "guard_preset": guard,
843 });
844 if let Some(n) = name {
845 body["name"] = serde_json::json!(n);
846 }
847 if let Some(m) = model {
848 body["model"] = serde_json::json!(m);
849 }
850 if let Some(sp) = system_prompt {
851 body["system_prompt"] = serde_json::json!(sp);
852 }
853 if let Some(tools) = allowed_tools {
854 body["allowed_tools"] = serde_json::json!(tools);
855 }
856 if let Some(p) = ink {
857 body["ink"] = serde_json::json!(p);
858 }
859 if let Some(mt) = max_turns {
860 body["max_turns"] = serde_json::json!(mt);
861 }
862 if let Some(mb) = max_budget {
863 body["max_budget_usd"] = serde_json::json!(mb);
864 }
865 if let Some(of) = output_format {
866 body["output_format"] = serde_json::json!(of);
867 }
868 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
869 .json(&body)
870 .send()
871 .await
872 .map_err(|e| friendly_error(&e, node))?;
873 let text = ok_or_api_error(resp).await?;
874 let session: Session = serde_json::from_str(&text)?;
875 Ok(format!(
876 "Created session \"{}\" ({})",
877 session.name, session.id
878 ))
879 }
880 Commands::Kill { name } => {
881 let resp = authed_post(
882 &client,
883 format!("{url}/api/v1/sessions/{name}/kill"),
884 token.as_deref(),
885 )
886 .send()
887 .await
888 .map_err(|e| friendly_error(&e, node))?;
889 ok_or_api_error(resp).await?;
890 Ok(format!("Session {name} killed."))
891 }
892 Commands::Delete { name } => {
893 let resp = authed_delete(
894 &client,
895 format!("{url}/api/v1/sessions/{name}"),
896 token.as_deref(),
897 )
898 .send()
899 .await
900 .map_err(|e| friendly_error(&e, node))?;
901 ok_or_api_error(resp).await?;
902 Ok(format!("Session {name} deleted."))
903 }
904 Commands::Logs {
905 name,
906 lines,
907 follow,
908 } => {
909 if *follow {
910 let mut stdout = std::io::stdout();
911 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
912 .await
913 .map_err(|e| {
914 match e.downcast::<reqwest::Error>() {
916 Ok(re) => friendly_error(&re, node),
917 Err(other) => other,
918 }
919 })?;
920 Ok(String::new())
921 } else {
922 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
923 .await
924 .map_err(|e| match e.downcast::<reqwest::Error>() {
925 Ok(re) => friendly_error(&re, node),
926 Err(other) => other,
927 })?;
928 Ok(output)
929 }
930 }
931 Commands::Interventions { name } => {
932 let resp = authed_get(
933 &client,
934 format!("{url}/api/v1/sessions/{name}/interventions"),
935 token.as_deref(),
936 )
937 .send()
938 .await
939 .map_err(|e| friendly_error(&e, node))?;
940 let text = ok_or_api_error(resp).await?;
941 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
942 Ok(format_interventions(&events))
943 }
944 Commands::Ui => {
945 let dashboard = base_url(&cli.node);
946 open_browser(&dashboard)?;
947 Ok(format!("Opening {dashboard}"))
948 }
949 Commands::Resume { name } => {
950 let resp = authed_post(
951 &client,
952 format!("{url}/api/v1/sessions/{name}/resume"),
953 token.as_deref(),
954 )
955 .send()
956 .await
957 .map_err(|e| friendly_error(&e, node))?;
958 let text = ok_or_api_error(resp).await?;
959 let session: Session = serde_json::from_str(&text)?;
960 Ok(format!("Resumed session \"{}\"", session.name))
961 }
962 Commands::Schedule { action } => execute_schedule(action, node),
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969
970 #[test]
971 fn test_base_url() {
972 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
973 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
974 }
975
976 #[test]
977 fn test_cli_parse_list() {
978 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
979 assert_eq!(cli.node, "localhost:7433");
980 assert!(matches!(cli.command, Commands::List));
981 }
982
983 #[test]
984 fn test_cli_parse_nodes() {
985 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
986 assert!(matches!(cli.command, Commands::Nodes));
987 }
988
989 #[test]
990 fn test_cli_parse_ui() {
991 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
992 assert!(matches!(cli.command, Commands::Ui));
993 }
994
995 #[test]
996 fn test_cli_parse_ui_custom_node() {
997 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
998 assert!(matches!(cli.command, Commands::Ui));
999 assert_eq!(cli.node, "mac-mini:7433");
1000 }
1001
1002 #[test]
1003 fn test_build_open_command() {
1004 let cmd = build_open_command("http://localhost:7433");
1005 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1006 assert_eq!(args, vec!["http://localhost:7433"]);
1007 #[cfg(target_os = "macos")]
1008 assert_eq!(cmd.get_program(), "open");
1009 #[cfg(target_os = "linux")]
1010 assert_eq!(cmd.get_program(), "xdg-open");
1011 }
1012
1013 #[test]
1014 fn test_cli_parse_spawn() {
1015 let cli = Cli::try_parse_from([
1016 "pulpo",
1017 "spawn",
1018 "--workdir",
1019 "/tmp/repo",
1020 "Fix",
1021 "the",
1022 "bug",
1023 ])
1024 .unwrap();
1025 assert!(matches!(
1026 &cli.command,
1027 Commands::Spawn { workdir, provider, auto, guard, prompt, .. }
1028 if workdir == "/tmp/repo" && provider == "claude" && !auto
1029 && guard == "standard" && prompt == &["Fix", "the", "bug"]
1030 ));
1031 }
1032
1033 #[test]
1034 fn test_cli_parse_spawn_with_provider() {
1035 let cli = Cli::try_parse_from([
1036 "pulpo",
1037 "spawn",
1038 "--workdir",
1039 "/tmp",
1040 "--provider",
1041 "codex",
1042 "Do it",
1043 ])
1044 .unwrap();
1045 assert!(matches!(
1046 &cli.command,
1047 Commands::Spawn { provider, .. } if provider == "codex"
1048 ));
1049 }
1050
1051 #[test]
1052 fn test_cli_parse_spawn_auto() {
1053 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1054 .unwrap();
1055 assert!(matches!(
1056 &cli.command,
1057 Commands::Spawn { auto, .. } if *auto
1058 ));
1059 }
1060
1061 #[test]
1062 fn test_cli_parse_spawn_with_guard() {
1063 let cli = Cli::try_parse_from([
1064 "pulpo",
1065 "spawn",
1066 "--workdir",
1067 "/tmp",
1068 "--guard",
1069 "strict",
1070 "Do it",
1071 ])
1072 .unwrap();
1073 assert!(matches!(
1074 &cli.command,
1075 Commands::Spawn { guard, .. } if guard == "strict"
1076 ));
1077 }
1078
1079 #[test]
1080 fn test_cli_parse_spawn_guard_default() {
1081 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1082 assert!(matches!(
1083 &cli.command,
1084 Commands::Spawn { guard, .. } if guard == "standard"
1085 ));
1086 }
1087
1088 #[test]
1089 fn test_cli_parse_spawn_with_name() {
1090 let cli = Cli::try_parse_from([
1091 "pulpo",
1092 "spawn",
1093 "--workdir",
1094 "/tmp/repo",
1095 "--name",
1096 "my-task",
1097 "Fix it",
1098 ])
1099 .unwrap();
1100 assert!(matches!(
1101 &cli.command,
1102 Commands::Spawn { workdir, name, .. }
1103 if workdir == "/tmp/repo" && name.as_deref() == Some("my-task")
1104 ));
1105 }
1106
1107 #[test]
1108 fn test_cli_parse_spawn_without_name() {
1109 let cli =
1110 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1111 assert!(matches!(
1112 &cli.command,
1113 Commands::Spawn { name, .. } if name.is_none()
1114 ));
1115 }
1116
1117 #[test]
1118 fn test_cli_parse_logs() {
1119 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1120 assert!(matches!(
1121 &cli.command,
1122 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1123 ));
1124 }
1125
1126 #[test]
1127 fn test_cli_parse_logs_with_lines() {
1128 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1129 assert!(matches!(
1130 &cli.command,
1131 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1132 ));
1133 }
1134
1135 #[test]
1136 fn test_cli_parse_logs_follow() {
1137 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1138 assert!(matches!(
1139 &cli.command,
1140 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1141 ));
1142 }
1143
1144 #[test]
1145 fn test_cli_parse_logs_follow_short() {
1146 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1147 assert!(matches!(
1148 &cli.command,
1149 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1150 ));
1151 }
1152
1153 #[test]
1154 fn test_cli_parse_kill() {
1155 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1156 assert!(matches!(
1157 &cli.command,
1158 Commands::Kill { name } if name == "my-session"
1159 ));
1160 }
1161
1162 #[test]
1163 fn test_cli_parse_delete() {
1164 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1165 assert!(matches!(
1166 &cli.command,
1167 Commands::Delete { name } if name == "my-session"
1168 ));
1169 }
1170
1171 #[test]
1172 fn test_cli_parse_resume() {
1173 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1174 assert!(matches!(
1175 &cli.command,
1176 Commands::Resume { name } if name == "my-session"
1177 ));
1178 }
1179
1180 #[test]
1181 fn test_cli_parse_input() {
1182 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1183 assert!(matches!(
1184 &cli.command,
1185 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1186 ));
1187 }
1188
1189 #[test]
1190 fn test_cli_parse_input_no_text() {
1191 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1192 assert!(matches!(
1193 &cli.command,
1194 Commands::Input { name, text } if name == "my-session" && text.is_none()
1195 ));
1196 }
1197
1198 #[test]
1199 fn test_cli_parse_input_alias() {
1200 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1201 assert!(matches!(
1202 &cli.command,
1203 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1204 ));
1205 }
1206
1207 #[test]
1208 fn test_cli_parse_custom_node() {
1209 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1210 assert_eq!(cli.node, "win-pc:8080");
1211 }
1212
1213 #[test]
1214 fn test_cli_version() {
1215 let result = Cli::try_parse_from(["pulpo", "--version"]);
1216 let err = result.unwrap_err();
1218 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1219 }
1220
1221 #[test]
1222 fn test_cli_parse_no_subcommand_fails() {
1223 let result = Cli::try_parse_from(["pulpo"]);
1224 assert!(result.is_err());
1225 }
1226
1227 #[test]
1228 fn test_cli_debug() {
1229 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1230 let debug = format!("{cli:?}");
1231 assert!(debug.contains("List"));
1232 }
1233
1234 #[test]
1235 fn test_commands_debug() {
1236 let cmd = Commands::List;
1237 assert_eq!(format!("{cmd:?}"), "List");
1238 }
1239
1240 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1242
1243 async fn start_test_server() -> String {
1245 use axum::http::StatusCode;
1246 use axum::{
1247 Json, Router,
1248 routing::{get, post},
1249 };
1250
1251 let session_json = TEST_SESSION_JSON;
1252
1253 let app = Router::new()
1254 .route(
1255 "/api/v1/sessions",
1256 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1257 (StatusCode::CREATED, session_json.to_owned())
1258 }),
1259 )
1260 .route(
1261 "/api/v1/sessions/{id}",
1262 get(move || async move { session_json.to_owned() })
1263 .delete(|| async { StatusCode::NO_CONTENT }),
1264 )
1265 .route(
1266 "/api/v1/sessions/{id}/kill",
1267 post(|| async { StatusCode::NO_CONTENT }),
1268 )
1269 .route(
1270 "/api/v1/sessions/{id}/output",
1271 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1272 )
1273 .route(
1274 "/api/v1/peers",
1275 get(|| async {
1276 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1277 }),
1278 )
1279 .route(
1280 "/api/v1/sessions/{id}/resume",
1281 axum::routing::post(move || async move { session_json.to_owned() }),
1282 )
1283 .route(
1284 "/api/v1/sessions/{id}/interventions",
1285 get(|| async { "[]".to_owned() }),
1286 )
1287 .route(
1288 "/api/v1/sessions/{id}/input",
1289 post(|| async { StatusCode::NO_CONTENT }),
1290 );
1291
1292 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1293 let addr = listener.local_addr().unwrap();
1294 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1295 format!("127.0.0.1:{}", addr.port())
1296 }
1297
1298 #[tokio::test]
1299 async fn test_execute_list_success() {
1300 let node = start_test_server().await;
1301 let cli = Cli {
1302 node,
1303 token: None,
1304 command: Commands::List,
1305 };
1306 let result = execute(&cli).await.unwrap();
1307 assert_eq!(result, "No sessions.");
1308 }
1309
1310 #[tokio::test]
1311 async fn test_execute_nodes_success() {
1312 let node = start_test_server().await;
1313 let cli = Cli {
1314 node,
1315 token: None,
1316 command: Commands::Nodes,
1317 };
1318 let result = execute(&cli).await.unwrap();
1319 assert!(result.contains("test"));
1320 assert!(result.contains("(local)"));
1321 assert!(result.contains("NAME"));
1322 }
1323
1324 #[tokio::test]
1325 async fn test_execute_spawn_success() {
1326 let node = start_test_server().await;
1327 let cli = Cli {
1328 node,
1329 token: None,
1330 command: Commands::Spawn {
1331 workdir: "/tmp/repo".into(),
1332 name: None,
1333 provider: "claude".into(),
1334 auto: false,
1335 guard: "standard".into(),
1336 model: None,
1337 system_prompt: None,
1338 allowed_tools: None,
1339 ink: None,
1340 max_turns: None,
1341 max_budget: None,
1342 output_format: None,
1343 prompt: vec!["Fix".into(), "bug".into()],
1344 },
1345 };
1346 let result = execute(&cli).await.unwrap();
1347 assert!(result.contains("Created session"));
1348 assert!(result.contains("repo"));
1349 }
1350
1351 #[tokio::test]
1352 async fn test_execute_spawn_with_all_new_flags() {
1353 let node = start_test_server().await;
1354 let cli = Cli {
1355 node,
1356 token: None,
1357 command: Commands::Spawn {
1358 workdir: "/tmp/repo".into(),
1359 name: None,
1360 provider: "claude".into(),
1361 auto: false,
1362 guard: "standard".into(),
1363 model: Some("opus".into()),
1364 system_prompt: Some("Be helpful".into()),
1365 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1366 ink: Some("coder".into()),
1367 max_turns: Some(5),
1368 max_budget: Some(2.5),
1369 output_format: Some("json".into()),
1370 prompt: vec!["Fix".into(), "bug".into()],
1371 },
1372 };
1373 let result = execute(&cli).await.unwrap();
1374 assert!(result.contains("Created session"));
1375 }
1376
1377 #[tokio::test]
1378 async fn test_execute_spawn_auto_mode() {
1379 let node = start_test_server().await;
1380 let cli = Cli {
1381 node,
1382 token: None,
1383 command: Commands::Spawn {
1384 workdir: "/tmp/repo".into(),
1385 name: None,
1386 provider: "claude".into(),
1387 auto: true,
1388 guard: "standard".into(),
1389 model: None,
1390 system_prompt: None,
1391 allowed_tools: None,
1392 ink: None,
1393 max_turns: None,
1394 max_budget: None,
1395 output_format: None,
1396 prompt: vec!["Do it".into()],
1397 },
1398 };
1399 let result = execute(&cli).await.unwrap();
1400 assert!(result.contains("Created session"));
1401 }
1402
1403 #[tokio::test]
1404 async fn test_execute_spawn_with_name() {
1405 let node = start_test_server().await;
1406 let cli = Cli {
1407 node,
1408 token: None,
1409 command: Commands::Spawn {
1410 workdir: "/tmp/repo".into(),
1411 name: Some("my-task".into()),
1412 provider: "claude".into(),
1413 auto: false,
1414 guard: "standard".into(),
1415 model: None,
1416 system_prompt: None,
1417 allowed_tools: None,
1418 ink: None,
1419 max_turns: None,
1420 max_budget: None,
1421 output_format: None,
1422 prompt: vec!["Fix".into(), "bug".into()],
1423 },
1424 };
1425 let result = execute(&cli).await.unwrap();
1426 assert!(result.contains("Created session"));
1427 }
1428
1429 #[tokio::test]
1430 async fn test_execute_kill_success() {
1431 let node = start_test_server().await;
1432 let cli = Cli {
1433 node,
1434 token: None,
1435 command: Commands::Kill {
1436 name: "test-session".into(),
1437 },
1438 };
1439 let result = execute(&cli).await.unwrap();
1440 assert!(result.contains("killed"));
1441 }
1442
1443 #[tokio::test]
1444 async fn test_execute_delete_success() {
1445 let node = start_test_server().await;
1446 let cli = Cli {
1447 node,
1448 token: None,
1449 command: Commands::Delete {
1450 name: "test-session".into(),
1451 },
1452 };
1453 let result = execute(&cli).await.unwrap();
1454 assert!(result.contains("deleted"));
1455 }
1456
1457 #[tokio::test]
1458 async fn test_execute_logs_success() {
1459 let node = start_test_server().await;
1460 let cli = Cli {
1461 node,
1462 token: None,
1463 command: Commands::Logs {
1464 name: "test-session".into(),
1465 lines: 50,
1466 follow: false,
1467 },
1468 };
1469 let result = execute(&cli).await.unwrap();
1470 assert!(result.contains("test output"));
1471 }
1472
1473 #[tokio::test]
1474 async fn test_execute_list_connection_refused() {
1475 let cli = Cli {
1476 node: "localhost:1".into(),
1477 token: None,
1478 command: Commands::List,
1479 };
1480 let result = execute(&cli).await;
1481 let err = result.unwrap_err().to_string();
1482 assert!(
1483 err.contains("Could not connect to pulpod"),
1484 "Expected friendly error, got: {err}"
1485 );
1486 assert!(err.contains("localhost:1"));
1487 }
1488
1489 #[tokio::test]
1490 async fn test_execute_nodes_connection_refused() {
1491 let cli = Cli {
1492 node: "localhost:1".into(),
1493 token: None,
1494 command: Commands::Nodes,
1495 };
1496 let result = execute(&cli).await;
1497 let err = result.unwrap_err().to_string();
1498 assert!(err.contains("Could not connect to pulpod"));
1499 }
1500
1501 #[tokio::test]
1502 async fn test_execute_kill_error_response() {
1503 use axum::{Router, http::StatusCode, routing::post};
1504
1505 let app = Router::new().route(
1506 "/api/v1/sessions/{id}/kill",
1507 post(|| async {
1508 (
1509 StatusCode::NOT_FOUND,
1510 "{\"error\":\"session not found: test-session\"}",
1511 )
1512 }),
1513 );
1514 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1515 let addr = listener.local_addr().unwrap();
1516 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1517 let node = format!("127.0.0.1:{}", addr.port());
1518
1519 let cli = Cli {
1520 node,
1521 token: None,
1522 command: Commands::Kill {
1523 name: "test-session".into(),
1524 },
1525 };
1526 let err = execute(&cli).await.unwrap_err();
1527 assert_eq!(err.to_string(), "session not found: test-session");
1528 }
1529
1530 #[tokio::test]
1531 async fn test_execute_delete_error_response() {
1532 use axum::{Router, http::StatusCode, routing::delete};
1533
1534 let app = Router::new().route(
1535 "/api/v1/sessions/{id}",
1536 delete(|| async {
1537 (
1538 StatusCode::CONFLICT,
1539 "{\"error\":\"cannot delete session in 'running' state\"}",
1540 )
1541 }),
1542 );
1543 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1544 let addr = listener.local_addr().unwrap();
1545 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1546 let node = format!("127.0.0.1:{}", addr.port());
1547
1548 let cli = Cli {
1549 node,
1550 token: None,
1551 command: Commands::Delete {
1552 name: "test-session".into(),
1553 },
1554 };
1555 let err = execute(&cli).await.unwrap_err();
1556 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1557 }
1558
1559 #[tokio::test]
1560 async fn test_execute_logs_error_response() {
1561 use axum::{Router, http::StatusCode, routing::get};
1562
1563 let app = Router::new().route(
1564 "/api/v1/sessions/{id}/output",
1565 get(|| async {
1566 (
1567 StatusCode::NOT_FOUND,
1568 "{\"error\":\"session not found: ghost\"}",
1569 )
1570 }),
1571 );
1572 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1573 let addr = listener.local_addr().unwrap();
1574 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1575 let node = format!("127.0.0.1:{}", addr.port());
1576
1577 let cli = Cli {
1578 node,
1579 token: None,
1580 command: Commands::Logs {
1581 name: "ghost".into(),
1582 lines: 50,
1583 follow: false,
1584 },
1585 };
1586 let err = execute(&cli).await.unwrap_err();
1587 assert_eq!(err.to_string(), "session not found: ghost");
1588 }
1589
1590 #[tokio::test]
1591 async fn test_execute_resume_error_response() {
1592 use axum::{Router, http::StatusCode, routing::post};
1593
1594 let app = Router::new().route(
1595 "/api/v1/sessions/{id}/resume",
1596 post(|| async {
1597 (
1598 StatusCode::BAD_REQUEST,
1599 "{\"error\":\"session is not stale (status: running)\"}",
1600 )
1601 }),
1602 );
1603 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1604 let addr = listener.local_addr().unwrap();
1605 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1606 let node = format!("127.0.0.1:{}", addr.port());
1607
1608 let cli = Cli {
1609 node,
1610 token: None,
1611 command: Commands::Resume {
1612 name: "test-session".into(),
1613 },
1614 };
1615 let err = execute(&cli).await.unwrap_err();
1616 assert_eq!(err.to_string(), "session is not stale (status: running)");
1617 }
1618
1619 #[tokio::test]
1620 async fn test_execute_spawn_error_response() {
1621 use axum::{Router, http::StatusCode, routing::post};
1622
1623 let app = Router::new().route(
1624 "/api/v1/sessions",
1625 post(|| async {
1626 (
1627 StatusCode::INTERNAL_SERVER_ERROR,
1628 "{\"error\":\"failed to spawn session\"}",
1629 )
1630 }),
1631 );
1632 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1633 let addr = listener.local_addr().unwrap();
1634 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1635 let node = format!("127.0.0.1:{}", addr.port());
1636
1637 let cli = Cli {
1638 node,
1639 token: None,
1640 command: Commands::Spawn {
1641 workdir: "/tmp/repo".into(),
1642 name: None,
1643 provider: "claude".into(),
1644 auto: false,
1645 guard: "standard".into(),
1646 model: None,
1647 system_prompt: None,
1648 allowed_tools: None,
1649 ink: None,
1650 max_turns: None,
1651 max_budget: None,
1652 output_format: None,
1653 prompt: vec!["test".into()],
1654 },
1655 };
1656 let err = execute(&cli).await.unwrap_err();
1657 assert_eq!(err.to_string(), "failed to spawn session");
1658 }
1659
1660 #[tokio::test]
1661 async fn test_execute_interventions_error_response() {
1662 use axum::{Router, http::StatusCode, routing::get};
1663
1664 let app = Router::new().route(
1665 "/api/v1/sessions/{id}/interventions",
1666 get(|| async {
1667 (
1668 StatusCode::NOT_FOUND,
1669 "{\"error\":\"session not found: ghost\"}",
1670 )
1671 }),
1672 );
1673 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1674 let addr = listener.local_addr().unwrap();
1675 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1676 let node = format!("127.0.0.1:{}", addr.port());
1677
1678 let cli = Cli {
1679 node,
1680 token: None,
1681 command: Commands::Interventions {
1682 name: "ghost".into(),
1683 },
1684 };
1685 let err = execute(&cli).await.unwrap_err();
1686 assert_eq!(err.to_string(), "session not found: ghost");
1687 }
1688
1689 #[tokio::test]
1690 async fn test_execute_resume_success() {
1691 let node = start_test_server().await;
1692 let cli = Cli {
1693 node,
1694 token: None,
1695 command: Commands::Resume {
1696 name: "test-session".into(),
1697 },
1698 };
1699 let result = execute(&cli).await.unwrap();
1700 assert!(result.contains("Resumed session"));
1701 assert!(result.contains("repo"));
1702 }
1703
1704 #[tokio::test]
1705 async fn test_execute_input_success() {
1706 let node = start_test_server().await;
1707 let cli = Cli {
1708 node,
1709 token: None,
1710 command: Commands::Input {
1711 name: "test-session".into(),
1712 text: Some("yes".into()),
1713 },
1714 };
1715 let result = execute(&cli).await.unwrap();
1716 assert!(result.contains("Sent input to session test-session"));
1717 }
1718
1719 #[tokio::test]
1720 async fn test_execute_input_no_text() {
1721 let node = start_test_server().await;
1722 let cli = Cli {
1723 node,
1724 token: None,
1725 command: Commands::Input {
1726 name: "test-session".into(),
1727 text: None,
1728 },
1729 };
1730 let result = execute(&cli).await.unwrap();
1731 assert!(result.contains("Sent input to session test-session"));
1732 }
1733
1734 #[tokio::test]
1735 async fn test_execute_input_connection_refused() {
1736 let cli = Cli {
1737 node: "localhost:1".into(),
1738 token: None,
1739 command: Commands::Input {
1740 name: "test".into(),
1741 text: Some("y".into()),
1742 },
1743 };
1744 let result = execute(&cli).await;
1745 let err = result.unwrap_err().to_string();
1746 assert!(err.contains("Could not connect to pulpod"));
1747 }
1748
1749 #[tokio::test]
1750 async fn test_execute_input_error_response() {
1751 use axum::{Router, http::StatusCode, routing::post};
1752
1753 let app = Router::new().route(
1754 "/api/v1/sessions/{id}/input",
1755 post(|| async {
1756 (
1757 StatusCode::NOT_FOUND,
1758 "{\"error\":\"session not found: ghost\"}",
1759 )
1760 }),
1761 );
1762 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1763 let addr = listener.local_addr().unwrap();
1764 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1765 let node = format!("127.0.0.1:{}", addr.port());
1766
1767 let cli = Cli {
1768 node,
1769 token: None,
1770 command: Commands::Input {
1771 name: "ghost".into(),
1772 text: Some("y".into()),
1773 },
1774 };
1775 let err = execute(&cli).await.unwrap_err();
1776 assert_eq!(err.to_string(), "session not found: ghost");
1777 }
1778
1779 #[tokio::test]
1780 async fn test_execute_ui() {
1781 let cli = Cli {
1782 node: "localhost:7433".into(),
1783 token: None,
1784 command: Commands::Ui,
1785 };
1786 let result = execute(&cli).await.unwrap();
1787 assert!(result.contains("Opening"));
1788 assert!(result.contains("http://localhost:7433"));
1789 }
1790
1791 #[tokio::test]
1792 async fn test_execute_ui_custom_node() {
1793 let cli = Cli {
1794 node: "mac-mini:7433".into(),
1795 token: None,
1796 command: Commands::Ui,
1797 };
1798 let result = execute(&cli).await.unwrap();
1799 assert!(result.contains("http://mac-mini:7433"));
1800 }
1801
1802 #[test]
1803 fn test_format_sessions_empty() {
1804 assert_eq!(format_sessions(&[]), "No sessions.");
1805 }
1806
1807 #[test]
1808 fn test_format_sessions_with_data() {
1809 use chrono::Utc;
1810 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1811 use uuid::Uuid;
1812
1813 let sessions = vec![Session {
1814 id: Uuid::nil(),
1815 name: "my-api".into(),
1816 workdir: "/tmp/repo".into(),
1817 provider: Provider::Claude,
1818 prompt: "Fix the bug".into(),
1819 status: SessionStatus::Running,
1820 mode: SessionMode::Interactive,
1821 conversation_id: None,
1822 exit_code: None,
1823 backend_session_id: None,
1824 output_snapshot: None,
1825 guard_config: None,
1826 model: None,
1827 allowed_tools: None,
1828 system_prompt: None,
1829 metadata: None,
1830 ink: None,
1831 max_turns: None,
1832 max_budget_usd: None,
1833 output_format: None,
1834 intervention_reason: None,
1835 intervention_at: None,
1836 last_output_at: None,
1837 idle_since: None,
1838 waiting_for_input: false,
1839 created_at: Utc::now(),
1840 updated_at: Utc::now(),
1841 }];
1842 let output = format_sessions(&sessions);
1843 assert!(output.contains("NAME"));
1844 assert!(output.contains("my-api"));
1845 assert!(output.contains("running"));
1846 assert!(output.contains("claude"));
1847 assert!(output.contains("Fix the bug"));
1848 }
1849
1850 #[test]
1851 fn test_format_sessions_long_prompt_truncated() {
1852 use chrono::Utc;
1853 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1854 use uuid::Uuid;
1855
1856 let sessions = vec![Session {
1857 id: Uuid::nil(),
1858 name: "test".into(),
1859 workdir: "/tmp".into(),
1860 provider: Provider::Codex,
1861 prompt: "A very long prompt that exceeds forty characters in total length".into(),
1862 status: SessionStatus::Completed,
1863 mode: SessionMode::Autonomous,
1864 conversation_id: None,
1865 exit_code: None,
1866 backend_session_id: None,
1867 output_snapshot: None,
1868 guard_config: None,
1869 model: None,
1870 allowed_tools: None,
1871 system_prompt: None,
1872 metadata: None,
1873 ink: None,
1874 max_turns: None,
1875 max_budget_usd: None,
1876 output_format: None,
1877 intervention_reason: None,
1878 intervention_at: None,
1879 last_output_at: None,
1880 idle_since: None,
1881 waiting_for_input: false,
1882 created_at: Utc::now(),
1883 updated_at: Utc::now(),
1884 }];
1885 let output = format_sessions(&sessions);
1886 assert!(output.contains("..."));
1887 }
1888
1889 #[test]
1890 fn test_format_sessions_waiting_for_input() {
1891 use chrono::Utc;
1892 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1893 use uuid::Uuid;
1894
1895 let sessions = vec![Session {
1896 id: Uuid::nil(),
1897 name: "blocked".into(),
1898 workdir: "/tmp".into(),
1899 provider: Provider::Claude,
1900 prompt: "Fix bug".into(),
1901 status: SessionStatus::Running,
1902 mode: SessionMode::Interactive,
1903 conversation_id: None,
1904 exit_code: None,
1905 backend_session_id: None,
1906 output_snapshot: None,
1907 guard_config: None,
1908 model: None,
1909 allowed_tools: None,
1910 system_prompt: None,
1911 metadata: None,
1912 ink: None,
1913 max_turns: None,
1914 max_budget_usd: None,
1915 output_format: None,
1916 intervention_reason: None,
1917 intervention_at: None,
1918 last_output_at: None,
1919 idle_since: None,
1920 waiting_for_input: true,
1921 created_at: Utc::now(),
1922 updated_at: Utc::now(),
1923 }];
1924 let output = format_sessions(&sessions);
1925 assert!(output.contains("waiting"));
1926 assert!(!output.contains("running"));
1927 }
1928
1929 #[test]
1930 fn test_format_nodes() {
1931 use pulpo_common::node::NodeInfo;
1932 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1933
1934 let resp = PeersResponse {
1935 local: NodeInfo {
1936 name: "mac-mini".into(),
1937 hostname: "h".into(),
1938 os: "macos".into(),
1939 arch: "arm64".into(),
1940 cpus: 8,
1941 memory_mb: 16384,
1942 gpu: None,
1943 },
1944 peers: vec![PeerInfo {
1945 name: "win-pc".into(),
1946 address: "win-pc:7433".into(),
1947 status: PeerStatus::Online,
1948 node_info: None,
1949 session_count: Some(3),
1950 source: PeerSource::Configured,
1951 }],
1952 };
1953 let output = format_nodes(&resp);
1954 assert!(output.contains("mac-mini"));
1955 assert!(output.contains("(local)"));
1956 assert!(output.contains("win-pc"));
1957 assert!(output.contains('3'));
1958 }
1959
1960 #[test]
1961 fn test_format_nodes_no_session_count() {
1962 use pulpo_common::node::NodeInfo;
1963 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
1964
1965 let resp = PeersResponse {
1966 local: NodeInfo {
1967 name: "local".into(),
1968 hostname: "h".into(),
1969 os: "linux".into(),
1970 arch: "x86_64".into(),
1971 cpus: 4,
1972 memory_mb: 8192,
1973 gpu: None,
1974 },
1975 peers: vec![PeerInfo {
1976 name: "peer".into(),
1977 address: "peer:7433".into(),
1978 status: PeerStatus::Offline,
1979 node_info: None,
1980 session_count: None,
1981 source: PeerSource::Configured,
1982 }],
1983 };
1984 let output = format_nodes(&resp);
1985 assert!(output.contains("offline"));
1986 let lines: Vec<&str> = output.lines().collect();
1988 assert!(lines[2].contains('-'));
1989 }
1990
1991 #[tokio::test]
1992 async fn test_execute_resume_connection_refused() {
1993 let cli = Cli {
1994 node: "localhost:1".into(),
1995 token: None,
1996 command: Commands::Resume {
1997 name: "test".into(),
1998 },
1999 };
2000 let result = execute(&cli).await;
2001 let err = result.unwrap_err().to_string();
2002 assert!(err.contains("Could not connect to pulpod"));
2003 }
2004
2005 #[tokio::test]
2006 async fn test_execute_spawn_connection_refused() {
2007 let cli = Cli {
2008 node: "localhost:1".into(),
2009 token: None,
2010 command: Commands::Spawn {
2011 workdir: "/tmp".into(),
2012 name: None,
2013 provider: "claude".into(),
2014 auto: false,
2015 guard: "standard".into(),
2016 model: None,
2017 system_prompt: None,
2018 allowed_tools: None,
2019 ink: None,
2020 max_turns: None,
2021 max_budget: None,
2022 output_format: None,
2023 prompt: vec!["test".into()],
2024 },
2025 };
2026 let result = execute(&cli).await;
2027 let err = result.unwrap_err().to_string();
2028 assert!(err.contains("Could not connect to pulpod"));
2029 }
2030
2031 #[tokio::test]
2032 async fn test_execute_kill_connection_refused() {
2033 let cli = Cli {
2034 node: "localhost:1".into(),
2035 token: None,
2036 command: Commands::Kill {
2037 name: "test".into(),
2038 },
2039 };
2040 let result = execute(&cli).await;
2041 let err = result.unwrap_err().to_string();
2042 assert!(err.contains("Could not connect to pulpod"));
2043 }
2044
2045 #[tokio::test]
2046 async fn test_execute_delete_connection_refused() {
2047 let cli = Cli {
2048 node: "localhost:1".into(),
2049 token: None,
2050 command: Commands::Delete {
2051 name: "test".into(),
2052 },
2053 };
2054 let result = execute(&cli).await;
2055 let err = result.unwrap_err().to_string();
2056 assert!(err.contains("Could not connect to pulpod"));
2057 }
2058
2059 #[tokio::test]
2060 async fn test_execute_logs_connection_refused() {
2061 let cli = Cli {
2062 node: "localhost:1".into(),
2063 token: None,
2064 command: Commands::Logs {
2065 name: "test".into(),
2066 lines: 50,
2067 follow: false,
2068 },
2069 };
2070 let result = execute(&cli).await;
2071 let err = result.unwrap_err().to_string();
2072 assert!(err.contains("Could not connect to pulpod"));
2073 }
2074
2075 #[tokio::test]
2076 async fn test_friendly_error_connect() {
2077 let err = reqwest::Client::new()
2079 .get("http://127.0.0.1:1")
2080 .send()
2081 .await
2082 .unwrap_err();
2083 let friendly = friendly_error(&err, "test-node:1");
2084 let msg = friendly.to_string();
2085 assert!(
2086 msg.contains("Could not connect"),
2087 "Expected connect message, got: {msg}"
2088 );
2089 }
2090
2091 #[tokio::test]
2092 async fn test_friendly_error_other() {
2093 let err = reqwest::Client::new()
2095 .get("http://[::invalid::url")
2096 .send()
2097 .await
2098 .unwrap_err();
2099 let friendly = friendly_error(&err, "bad-host");
2100 let msg = friendly.to_string();
2101 assert!(
2102 msg.contains("Network error"),
2103 "Expected network error message, got: {msg}"
2104 );
2105 assert!(msg.contains("bad-host"));
2106 }
2107
2108 #[test]
2111 fn test_is_localhost_variants() {
2112 assert!(is_localhost("localhost:7433"));
2113 assert!(is_localhost("127.0.0.1:7433"));
2114 assert!(is_localhost("[::1]:7433"));
2115 assert!(is_localhost("::1"));
2116 assert!(is_localhost("localhost"));
2117 assert!(!is_localhost("mac-mini:7433"));
2118 assert!(!is_localhost("192.168.1.100:7433"));
2119 }
2120
2121 #[test]
2122 fn test_authed_get_with_token() {
2123 let client = reqwest::Client::new();
2124 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2125 .build()
2126 .unwrap();
2127 let auth = req
2128 .headers()
2129 .get("authorization")
2130 .unwrap()
2131 .to_str()
2132 .unwrap();
2133 assert_eq!(auth, "Bearer tok");
2134 }
2135
2136 #[test]
2137 fn test_authed_get_without_token() {
2138 let client = reqwest::Client::new();
2139 let req = authed_get(&client, "http://h:1/api".into(), None)
2140 .build()
2141 .unwrap();
2142 assert!(req.headers().get("authorization").is_none());
2143 }
2144
2145 #[test]
2146 fn test_authed_post_with_token() {
2147 let client = reqwest::Client::new();
2148 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2149 .build()
2150 .unwrap();
2151 let auth = req
2152 .headers()
2153 .get("authorization")
2154 .unwrap()
2155 .to_str()
2156 .unwrap();
2157 assert_eq!(auth, "Bearer secret");
2158 }
2159
2160 #[test]
2161 fn test_authed_post_without_token() {
2162 let client = reqwest::Client::new();
2163 let req = authed_post(&client, "http://h:1/api".into(), None)
2164 .build()
2165 .unwrap();
2166 assert!(req.headers().get("authorization").is_none());
2167 }
2168
2169 #[test]
2170 fn test_authed_delete_with_token() {
2171 let client = reqwest::Client::new();
2172 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2173 .build()
2174 .unwrap();
2175 let auth = req
2176 .headers()
2177 .get("authorization")
2178 .unwrap()
2179 .to_str()
2180 .unwrap();
2181 assert_eq!(auth, "Bearer del-tok");
2182 }
2183
2184 #[test]
2185 fn test_authed_delete_without_token() {
2186 let client = reqwest::Client::new();
2187 let req = authed_delete(&client, "http://h:1/api".into(), None)
2188 .build()
2189 .unwrap();
2190 assert!(req.headers().get("authorization").is_none());
2191 }
2192
2193 #[tokio::test]
2194 async fn test_resolve_token_explicit() {
2195 let client = reqwest::Client::new();
2196 let token =
2197 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2198 assert_eq!(token, Some("my-tok".into()));
2199 }
2200
2201 #[tokio::test]
2202 async fn test_resolve_token_remote_no_explicit() {
2203 let client = reqwest::Client::new();
2204 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2205 assert_eq!(token, None);
2206 }
2207
2208 #[tokio::test]
2209 async fn test_resolve_token_localhost_auto_discover() {
2210 use axum::{Json, Router, routing::get};
2211
2212 let app = Router::new().route(
2213 "/api/v1/auth/token",
2214 get(|| async {
2215 Json(AuthTokenResponse {
2216 token: "discovered".into(),
2217 })
2218 }),
2219 );
2220 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2221 let addr = listener.local_addr().unwrap();
2222 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2223
2224 let node = format!("localhost:{}", addr.port());
2225 let base = base_url(&node);
2226 let client = reqwest::Client::new();
2227 let token = resolve_token(&client, &base, &node, None).await;
2228 assert_eq!(token, Some("discovered".into()));
2229 }
2230
2231 #[tokio::test]
2232 async fn test_discover_token_empty_returns_none() {
2233 use axum::{Json, Router, routing::get};
2234
2235 let app = Router::new().route(
2236 "/api/v1/auth/token",
2237 get(|| async {
2238 Json(AuthTokenResponse {
2239 token: String::new(),
2240 })
2241 }),
2242 );
2243 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2244 let addr = listener.local_addr().unwrap();
2245 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2246
2247 let base = format!("http://127.0.0.1:{}", addr.port());
2248 let client = reqwest::Client::new();
2249 assert_eq!(discover_token(&client, &base).await, None);
2250 }
2251
2252 #[tokio::test]
2253 async fn test_discover_token_unreachable_returns_none() {
2254 let client = reqwest::Client::new();
2255 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2256 }
2257
2258 #[test]
2259 fn test_cli_parse_with_token() {
2260 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2261 assert_eq!(cli.token, Some("my-secret".into()));
2262 }
2263
2264 #[test]
2265 fn test_cli_parse_without_token() {
2266 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2267 assert_eq!(cli.token, None);
2268 }
2269
2270 #[tokio::test]
2271 async fn test_execute_with_explicit_token_sends_header() {
2272 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2273
2274 let app = Router::new().route(
2275 "/api/v1/sessions",
2276 get(|req: Request| async move {
2277 let auth = req
2278 .headers()
2279 .get("authorization")
2280 .and_then(|v| v.to_str().ok())
2281 .unwrap_or("");
2282 assert_eq!(auth, "Bearer test-token");
2283 (StatusCode::OK, "[]".to_owned())
2284 }),
2285 );
2286 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287 let addr = listener.local_addr().unwrap();
2288 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2289 let node = format!("127.0.0.1:{}", addr.port());
2290
2291 let cli = Cli {
2292 node,
2293 token: Some("test-token".into()),
2294 command: Commands::List,
2295 };
2296 let result = execute(&cli).await.unwrap();
2297 assert_eq!(result, "No sessions.");
2298 }
2299
2300 #[test]
2303 fn test_cli_parse_interventions() {
2304 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2305 assert!(matches!(
2306 &cli.command,
2307 Commands::Interventions { name } if name == "my-session"
2308 ));
2309 }
2310
2311 #[test]
2312 fn test_format_interventions_empty() {
2313 assert_eq!(format_interventions(&[]), "No intervention events.");
2314 }
2315
2316 #[test]
2317 fn test_format_interventions_with_data() {
2318 let events = vec![
2319 InterventionEventResponse {
2320 id: 1,
2321 session_id: "sess-1".into(),
2322 reason: "Memory exceeded threshold".into(),
2323 created_at: "2026-01-01T00:00:00Z".into(),
2324 },
2325 InterventionEventResponse {
2326 id: 2,
2327 session_id: "sess-1".into(),
2328 reason: "Idle for 10 minutes".into(),
2329 created_at: "2026-01-02T00:00:00Z".into(),
2330 },
2331 ];
2332 let output = format_interventions(&events);
2333 assert!(output.contains("ID"));
2334 assert!(output.contains("TIMESTAMP"));
2335 assert!(output.contains("REASON"));
2336 assert!(output.contains("Memory exceeded threshold"));
2337 assert!(output.contains("Idle for 10 minutes"));
2338 assert!(output.contains("2026-01-01T00:00:00Z"));
2339 }
2340
2341 #[tokio::test]
2342 async fn test_execute_interventions_empty() {
2343 let node = start_test_server().await;
2344 let cli = Cli {
2345 node,
2346 token: None,
2347 command: Commands::Interventions {
2348 name: "my-session".into(),
2349 },
2350 };
2351 let result = execute(&cli).await.unwrap();
2352 assert_eq!(result, "No intervention events.");
2353 }
2354
2355 #[tokio::test]
2356 async fn test_execute_interventions_with_data() {
2357 use axum::{Router, routing::get};
2358
2359 let app = Router::new().route(
2360 "/api/v1/sessions/{id}/interventions",
2361 get(|| async {
2362 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2363 .to_owned()
2364 }),
2365 );
2366 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2367 let addr = listener.local_addr().unwrap();
2368 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2369 let node = format!("127.0.0.1:{}", addr.port());
2370
2371 let cli = Cli {
2372 node,
2373 token: None,
2374 command: Commands::Interventions {
2375 name: "test".into(),
2376 },
2377 };
2378 let result = execute(&cli).await.unwrap();
2379 assert!(result.contains("OOM"));
2380 assert!(result.contains("2026-01-01T00:00:00Z"));
2381 }
2382
2383 #[tokio::test]
2384 async fn test_execute_interventions_connection_refused() {
2385 let cli = Cli {
2386 node: "localhost:1".into(),
2387 token: None,
2388 command: Commands::Interventions {
2389 name: "test".into(),
2390 },
2391 };
2392 let result = execute(&cli).await;
2393 let err = result.unwrap_err().to_string();
2394 assert!(err.contains("Could not connect to pulpod"));
2395 }
2396
2397 #[test]
2400 fn test_build_attach_command() {
2401 let cmd = build_attach_command("pulpo-my-session");
2402 assert_eq!(cmd.get_program(), "tmux");
2403 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2404 assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2405 }
2406
2407 #[test]
2408 fn test_cli_parse_attach() {
2409 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2410 assert!(matches!(
2411 &cli.command,
2412 Commands::Attach { name } if name == "my-session"
2413 ));
2414 }
2415
2416 #[test]
2417 fn test_cli_parse_attach_alias() {
2418 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2419 assert!(matches!(
2420 &cli.command,
2421 Commands::Attach { name } if name == "my-session"
2422 ));
2423 }
2424
2425 #[tokio::test]
2426 async fn test_execute_attach_success() {
2427 let node = start_test_server().await;
2428 let cli = Cli {
2429 node,
2430 token: None,
2431 command: Commands::Attach {
2432 name: "test-session".into(),
2433 },
2434 };
2435 let result = execute(&cli).await.unwrap();
2436 assert!(result.contains("Detached from session test-session"));
2437 }
2438
2439 #[tokio::test]
2440 async fn test_execute_attach_with_backend_session_id() {
2441 use axum::{Router, routing::get};
2442 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2443 let app = Router::new().route(
2444 "/api/v1/sessions/{id}",
2445 get(move || async move { session_json.to_owned() }),
2446 );
2447 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2448 let addr = listener.local_addr().unwrap();
2449 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2450
2451 let cli = Cli {
2452 node: format!("127.0.0.1:{}", addr.port()),
2453 token: None,
2454 command: Commands::Attach {
2455 name: "my-session".into(),
2456 },
2457 };
2458 let result = execute(&cli).await.unwrap();
2459 assert!(result.contains("Detached from session my-session"));
2460 }
2461
2462 #[tokio::test]
2463 async fn test_execute_attach_connection_refused() {
2464 let cli = Cli {
2465 node: "localhost:1".into(),
2466 token: None,
2467 command: Commands::Attach {
2468 name: "test-session".into(),
2469 },
2470 };
2471 let result = execute(&cli).await;
2472 let err = result.unwrap_err().to_string();
2473 assert!(err.contains("Could not connect to pulpod"));
2474 }
2475
2476 #[tokio::test]
2477 async fn test_execute_attach_error_response() {
2478 use axum::{Router, http::StatusCode, routing::get};
2479 let app = Router::new().route(
2480 "/api/v1/sessions/{id}",
2481 get(|| async {
2482 (
2483 StatusCode::NOT_FOUND,
2484 r#"{"error":"session not found"}"#.to_owned(),
2485 )
2486 }),
2487 );
2488 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2489 let addr = listener.local_addr().unwrap();
2490 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2491
2492 let cli = Cli {
2493 node: format!("127.0.0.1:{}", addr.port()),
2494 token: None,
2495 command: Commands::Attach {
2496 name: "nonexistent".into(),
2497 },
2498 };
2499 let result = execute(&cli).await;
2500 let err = result.unwrap_err().to_string();
2501 assert!(err.contains("session not found"));
2502 }
2503
2504 #[test]
2507 fn test_cli_parse_alias_spawn() {
2508 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2509 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2510 }
2511
2512 #[test]
2513 fn test_cli_parse_alias_list() {
2514 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2515 assert!(matches!(cli.command, Commands::List));
2516 }
2517
2518 #[test]
2519 fn test_cli_parse_alias_logs() {
2520 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2521 assert!(matches!(
2522 &cli.command,
2523 Commands::Logs { name, .. } if name == "my-session"
2524 ));
2525 }
2526
2527 #[test]
2528 fn test_cli_parse_alias_kill() {
2529 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2530 assert!(matches!(
2531 &cli.command,
2532 Commands::Kill { name } if name == "my-session"
2533 ));
2534 }
2535
2536 #[test]
2537 fn test_cli_parse_alias_delete() {
2538 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2539 assert!(matches!(
2540 &cli.command,
2541 Commands::Delete { name } if name == "my-session"
2542 ));
2543 }
2544
2545 #[test]
2546 fn test_cli_parse_alias_resume() {
2547 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2548 assert!(matches!(
2549 &cli.command,
2550 Commands::Resume { name } if name == "my-session"
2551 ));
2552 }
2553
2554 #[test]
2555 fn test_cli_parse_alias_nodes() {
2556 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2557 assert!(matches!(cli.command, Commands::Nodes));
2558 }
2559
2560 #[test]
2561 fn test_cli_parse_alias_interventions() {
2562 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2563 assert!(matches!(
2564 &cli.command,
2565 Commands::Interventions { name } if name == "my-session"
2566 ));
2567 }
2568
2569 #[test]
2570 fn test_api_error_json() {
2571 let err = api_error("{\"error\":\"session not found: foo\"}");
2572 assert_eq!(err.to_string(), "session not found: foo");
2573 }
2574
2575 #[test]
2576 fn test_api_error_plain_text() {
2577 let err = api_error("plain text error");
2578 assert_eq!(err.to_string(), "plain text error");
2579 }
2580
2581 #[test]
2584 fn test_diff_output_empty_prev() {
2585 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2586 }
2587
2588 #[test]
2589 fn test_diff_output_identical() {
2590 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2591 }
2592
2593 #[test]
2594 fn test_diff_output_new_lines_appended() {
2595 let prev = "line1\nline2";
2596 let new = "line1\nline2\nline3\nline4";
2597 assert_eq!(diff_output(prev, new), "line3\nline4");
2598 }
2599
2600 #[test]
2601 fn test_diff_output_scrolled_window() {
2602 let prev = "line1\nline2\nline3";
2604 let new = "line2\nline3\nline4";
2605 assert_eq!(diff_output(prev, new), "line4");
2606 }
2607
2608 #[test]
2609 fn test_diff_output_completely_different() {
2610 let prev = "aaa\nbbb";
2611 let new = "xxx\nyyy";
2612 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2613 }
2614
2615 #[test]
2616 fn test_diff_output_last_line_matches_but_overlap_fails() {
2617 let prev = "aaa\ncommon";
2619 let new = "zzz\ncommon\nnew_line";
2620 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2624 }
2625
2626 #[test]
2627 fn test_diff_output_new_empty() {
2628 assert_eq!(diff_output("line1", ""), "");
2629 }
2630
2631 async fn start_follow_test_server() -> String {
2635 use axum::{Router, extract::Path, extract::Query, routing::get};
2636 use std::sync::Arc;
2637 use std::sync::atomic::{AtomicUsize, Ordering};
2638
2639 let call_count = Arc::new(AtomicUsize::new(0));
2640 let output_count = call_count.clone();
2641 let status_count = Arc::new(AtomicUsize::new(0));
2642 let status_count_inner = status_count.clone();
2643
2644 let app = Router::new()
2645 .route(
2646 "/api/v1/sessions/{id}/output",
2647 get(
2648 move |_path: Path<String>,
2649 _query: Query<std::collections::HashMap<String, String>>| {
2650 let count = output_count.clone();
2651 async move {
2652 let n = count.fetch_add(1, Ordering::SeqCst);
2653 let output = match n {
2654 0 => "line1\nline2".to_owned(),
2655 1 => "line1\nline2\nline3".to_owned(),
2656 _ => "line2\nline3\nline4".to_owned(),
2657 };
2658 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2659 }
2660 },
2661 ),
2662 )
2663 .route(
2664 "/api/v1/sessions/{id}",
2665 get(move |_path: Path<String>| {
2666 let count = status_count_inner.clone();
2667 async move {
2668 let n = count.fetch_add(1, Ordering::SeqCst);
2669 let status = if n < 2 { "running" } else { "completed" };
2670 format!(
2671 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2672 )
2673 }
2674 }),
2675 );
2676
2677 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2678 let addr = listener.local_addr().unwrap();
2679 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2680 format!("http://127.0.0.1:{}", addr.port())
2681 }
2682
2683 #[tokio::test]
2684 async fn test_follow_logs_polls_and_exits_on_completed() {
2685 let base = start_follow_test_server().await;
2686 let client = reqwest::Client::new();
2687 let mut buf = Vec::new();
2688
2689 follow_logs(&client, &base, "test", 100, None, &mut buf)
2690 .await
2691 .unwrap();
2692
2693 let output = String::from_utf8(buf).unwrap();
2694 assert!(output.contains("line1"));
2696 assert!(output.contains("line2"));
2697 assert!(output.contains("line3"));
2698 assert!(output.contains("line4"));
2699 }
2700
2701 #[tokio::test]
2702 async fn test_execute_logs_follow_success() {
2703 let base = start_follow_test_server().await;
2704 let node = base.strip_prefix("http://").unwrap().to_owned();
2706
2707 let cli = Cli {
2708 node,
2709 token: None,
2710 command: Commands::Logs {
2711 name: "test".into(),
2712 lines: 100,
2713 follow: true,
2714 },
2715 };
2716 let result = execute(&cli).await.unwrap();
2718 assert_eq!(result, "");
2719 }
2720
2721 #[tokio::test]
2722 async fn test_execute_logs_follow_connection_refused() {
2723 let cli = Cli {
2724 node: "localhost:1".into(),
2725 token: None,
2726 command: Commands::Logs {
2727 name: "test".into(),
2728 lines: 50,
2729 follow: true,
2730 },
2731 };
2732 let result = execute(&cli).await;
2733 let err = result.unwrap_err().to_string();
2734 assert!(
2735 err.contains("Could not connect to pulpod"),
2736 "Expected friendly error, got: {err}"
2737 );
2738 }
2739
2740 #[tokio::test]
2741 async fn test_follow_logs_exits_on_dead() {
2742 use axum::{Router, extract::Path, extract::Query, routing::get};
2743
2744 let app = Router::new()
2745 .route(
2746 "/api/v1/sessions/{id}/output",
2747 get(
2748 |_path: Path<String>,
2749 _query: Query<std::collections::HashMap<String, String>>| async {
2750 r#"{"output":"some output"}"#.to_owned()
2751 },
2752 ),
2753 )
2754 .route(
2755 "/api/v1/sessions/{id}",
2756 get(|_path: Path<String>| async {
2757 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2758 }),
2759 );
2760
2761 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2762 let addr = listener.local_addr().unwrap();
2763 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2764 let base = format!("http://127.0.0.1:{}", addr.port());
2765
2766 let client = reqwest::Client::new();
2767 let mut buf = Vec::new();
2768 follow_logs(&client, &base, "test", 100, None, &mut buf)
2769 .await
2770 .unwrap();
2771
2772 let output = String::from_utf8(buf).unwrap();
2773 assert!(output.contains("some output"));
2774 }
2775
2776 #[tokio::test]
2777 async fn test_follow_logs_exits_on_stale() {
2778 use axum::{Router, extract::Path, extract::Query, routing::get};
2779
2780 let app = Router::new()
2781 .route(
2782 "/api/v1/sessions/{id}/output",
2783 get(
2784 |_path: Path<String>,
2785 _query: Query<std::collections::HashMap<String, String>>| async {
2786 r#"{"output":"stale output"}"#.to_owned()
2787 },
2788 ),
2789 )
2790 .route(
2791 "/api/v1/sessions/{id}",
2792 get(|_path: Path<String>| async {
2793 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2794 }),
2795 );
2796
2797 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2798 let addr = listener.local_addr().unwrap();
2799 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2800 let base = format!("http://127.0.0.1:{}", addr.port());
2801
2802 let client = reqwest::Client::new();
2803 let mut buf = Vec::new();
2804 follow_logs(&client, &base, "test", 100, None, &mut buf)
2805 .await
2806 .unwrap();
2807
2808 let output = String::from_utf8(buf).unwrap();
2809 assert!(output.contains("stale output"));
2810 }
2811
2812 #[tokio::test]
2813 async fn test_execute_logs_follow_non_reqwest_error() {
2814 use axum::{Router, extract::Path, extract::Query, routing::get};
2815
2816 let app = Router::new()
2818 .route(
2819 "/api/v1/sessions/{id}/output",
2820 get(
2821 |_path: Path<String>,
2822 _query: Query<std::collections::HashMap<String, String>>| async {
2823 r#"{"output":"initial"}"#.to_owned()
2824 },
2825 ),
2826 )
2827 .route(
2828 "/api/v1/sessions/{id}",
2829 get(|_path: Path<String>| async { "not valid json".to_owned() }),
2830 );
2831
2832 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2833 let addr = listener.local_addr().unwrap();
2834 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2835 let node = format!("127.0.0.1:{}", addr.port());
2836
2837 let cli = Cli {
2838 node,
2839 token: None,
2840 command: Commands::Logs {
2841 name: "test".into(),
2842 lines: 100,
2843 follow: true,
2844 },
2845 };
2846 let err = execute(&cli).await.unwrap_err();
2847 let msg = err.to_string();
2849 assert!(
2850 msg.contains("expected ident"),
2851 "Expected serde parse error, got: {msg}"
2852 );
2853 }
2854
2855 #[test]
2856 fn test_cli_parse_spawn_with_guardrails() {
2857 let cli = Cli::try_parse_from([
2858 "pulpo",
2859 "spawn",
2860 "--workdir",
2861 "/tmp",
2862 "--max-turns",
2863 "10",
2864 "--max-budget",
2865 "5.5",
2866 "--output-format",
2867 "json",
2868 "Do it",
2869 ])
2870 .unwrap();
2871 assert!(matches!(
2872 &cli.command,
2873 Commands::Spawn { max_turns, max_budget, output_format, .. }
2874 if *max_turns == Some(10) && *max_budget == Some(5.5)
2875 && output_format.as_deref() == Some("json")
2876 ));
2877 }
2878
2879 #[tokio::test]
2880 async fn test_fetch_session_status_connection_error() {
2881 let client = reqwest::Client::new();
2882 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
2883 assert!(result.is_err());
2884 }
2885
2886 #[test]
2889 fn test_build_crontab_line() {
2890 let line = build_crontab_line(
2891 "nightly-review",
2892 "0 3 * * *",
2893 "/home/me/repo",
2894 "claude",
2895 "Review PRs",
2896 "localhost:7433",
2897 );
2898 assert_eq!(
2899 line,
2900 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
2901 );
2902 }
2903
2904 #[test]
2905 fn test_crontab_install_success() {
2906 let crontab = "# existing cron\n0 * * * * echo hi\n";
2907 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
2908 let result = crontab_install(crontab, "my-job", line).unwrap();
2909 assert!(result.starts_with("# existing cron\n"));
2910 assert!(result.ends_with("#pulpo:my-job\n"));
2911 assert!(result.contains("echo hi"));
2912 }
2913
2914 #[test]
2915 fn test_crontab_install_duplicate_error() {
2916 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2917 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
2918 let err = crontab_install(crontab, "my-job", line).unwrap_err();
2919 assert!(err.to_string().contains("already exists"));
2920 }
2921
2922 #[test]
2923 fn test_crontab_list_empty() {
2924 assert_eq!(crontab_list(""), "No pulpo schedules.");
2925 }
2926
2927 #[test]
2928 fn test_crontab_list_no_pulpo_entries() {
2929 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
2930 }
2931
2932 #[test]
2933 fn test_crontab_list_with_entries() {
2934 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
2935 let output = crontab_list(crontab);
2936 assert!(output.contains("NAME"));
2937 assert!(output.contains("CRON"));
2938 assert!(output.contains("PAUSED"));
2939 assert!(output.contains("nightly"));
2940 assert!(output.contains("0 3 * * *"));
2941 assert!(output.contains("no"));
2942 }
2943
2944 #[test]
2945 fn test_crontab_list_paused_entry() {
2946 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
2947 let output = crontab_list(crontab);
2948 assert!(output.contains("paused-job"));
2949 assert!(output.contains("yes"));
2950 }
2951
2952 #[test]
2953 fn test_crontab_list_short_line() {
2954 let crontab = "badcron #pulpo:broken\n";
2956 let output = crontab_list(crontab);
2957 assert!(output.contains("broken"));
2958 assert!(output.contains('?'));
2959 }
2960
2961 #[test]
2962 fn test_crontab_remove_success() {
2963 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
2964 let result = crontab_remove(crontab, "my-job").unwrap();
2965 assert!(result.contains("echo hi"));
2966 assert!(!result.contains("my-job"));
2967 }
2968
2969 #[test]
2970 fn test_crontab_remove_not_found() {
2971 let crontab = "0 * * * * echo hi\n";
2972 let err = crontab_remove(crontab, "ghost").unwrap_err();
2973 assert!(err.to_string().contains("not found"));
2974 }
2975
2976 #[test]
2977 fn test_crontab_pause_success() {
2978 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
2979 let result = crontab_pause(crontab, "my-job").unwrap();
2980 assert!(result.starts_with('#'));
2981 assert!(result.contains("#pulpo:my-job"));
2982 }
2983
2984 #[test]
2985 fn test_crontab_pause_not_found() {
2986 let crontab = "0 * * * * echo hi\n";
2987 let err = crontab_pause(crontab, "ghost").unwrap_err();
2988 assert!(err.to_string().contains("not found or already paused"));
2989 }
2990
2991 #[test]
2992 fn test_crontab_pause_already_paused() {
2993 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
2994 let err = crontab_pause(crontab, "my-job").unwrap_err();
2995 assert!(err.to_string().contains("already paused"));
2996 }
2997
2998 #[test]
2999 fn test_crontab_resume_success() {
3000 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3001 let result = crontab_resume(crontab, "my-job").unwrap();
3002 assert!(!result.starts_with('#'));
3003 assert!(result.contains("#pulpo:my-job"));
3004 }
3005
3006 #[test]
3007 fn test_crontab_resume_not_found() {
3008 let crontab = "0 * * * * echo hi\n";
3009 let err = crontab_resume(crontab, "ghost").unwrap_err();
3010 assert!(err.to_string().contains("not found or not paused"));
3011 }
3012
3013 #[test]
3014 fn test_crontab_resume_not_paused() {
3015 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3016 let err = crontab_resume(crontab, "my-job").unwrap_err();
3017 assert!(err.to_string().contains("not paused"));
3018 }
3019
3020 #[test]
3023 fn test_cli_parse_schedule_install() {
3024 let cli = Cli::try_parse_from([
3025 "pulpo",
3026 "schedule",
3027 "install",
3028 "nightly",
3029 "0 3 * * *",
3030 "--workdir",
3031 "/tmp/repo",
3032 "Review",
3033 "PRs",
3034 ])
3035 .unwrap();
3036 assert!(matches!(
3037 &cli.command,
3038 Commands::Schedule {
3039 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3040 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3041 && provider == "claude" && prompt == &["Review", "PRs"]
3042 ));
3043 }
3044
3045 #[test]
3046 fn test_cli_parse_schedule_list() {
3047 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3048 assert!(matches!(
3049 &cli.command,
3050 Commands::Schedule {
3051 action: ScheduleAction::List
3052 }
3053 ));
3054 }
3055
3056 #[test]
3057 fn test_cli_parse_schedule_remove() {
3058 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3059 assert!(matches!(
3060 &cli.command,
3061 Commands::Schedule {
3062 action: ScheduleAction::Remove { name }
3063 } if name == "nightly"
3064 ));
3065 }
3066
3067 #[test]
3068 fn test_cli_parse_schedule_pause() {
3069 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3070 assert!(matches!(
3071 &cli.command,
3072 Commands::Schedule {
3073 action: ScheduleAction::Pause { name }
3074 } if name == "nightly"
3075 ));
3076 }
3077
3078 #[test]
3079 fn test_cli_parse_schedule_resume() {
3080 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3081 assert!(matches!(
3082 &cli.command,
3083 Commands::Schedule {
3084 action: ScheduleAction::Resume { name }
3085 } if name == "nightly"
3086 ));
3087 }
3088
3089 #[test]
3090 fn test_cli_parse_schedule_alias() {
3091 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3092 assert!(matches!(
3093 &cli.command,
3094 Commands::Schedule {
3095 action: ScheduleAction::List
3096 }
3097 ));
3098 }
3099
3100 #[test]
3101 fn test_cli_parse_schedule_list_alias() {
3102 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3103 assert!(matches!(
3104 &cli.command,
3105 Commands::Schedule {
3106 action: ScheduleAction::List
3107 }
3108 ));
3109 }
3110
3111 #[test]
3112 fn test_cli_parse_schedule_remove_alias() {
3113 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3114 assert!(matches!(
3115 &cli.command,
3116 Commands::Schedule {
3117 action: ScheduleAction::Remove { name }
3118 } if name == "nightly"
3119 ));
3120 }
3121
3122 #[test]
3123 fn test_cli_parse_schedule_install_custom_provider() {
3124 let cli = Cli::try_parse_from([
3125 "pulpo",
3126 "schedule",
3127 "install",
3128 "daily",
3129 "0 9 * * *",
3130 "--workdir",
3131 "/tmp",
3132 "--provider",
3133 "codex",
3134 "Run tests",
3135 ])
3136 .unwrap();
3137 assert!(matches!(
3138 &cli.command,
3139 Commands::Schedule {
3140 action: ScheduleAction::Install { provider, .. }
3141 } if provider == "codex"
3142 ));
3143 }
3144
3145 #[tokio::test]
3146 async fn test_execute_schedule_via_execute() {
3147 let node = start_test_server().await;
3149 let cli = Cli {
3150 node,
3151 token: None,
3152 command: Commands::Schedule {
3153 action: ScheduleAction::List,
3154 },
3155 };
3156 let result = execute(&cli).await;
3157 assert!(result.is_ok() || result.is_err());
3159 }
3160
3161 #[test]
3162 fn test_schedule_action_debug() {
3163 let action = ScheduleAction::List;
3164 assert_eq!(format!("{action:?}"), "List");
3165 }
3166}