1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8#[cfg(test)]
9use pulpo_common::session::Runtime;
10use pulpo_common::session::Session;
11
12#[derive(Parser, Debug)]
13#[command(
14 name = "pulpo",
15 about = "Manage agent sessions across your machines",
16 version = env!("PULPO_VERSION"),
17 args_conflicts_with_subcommands = true
18)]
19pub struct Cli {
20 #[arg(long, default_value = "localhost:7433")]
22 pub node: String,
23
24 #[arg(long)]
26 pub token: Option<String>,
27
28 #[command(subcommand)]
29 pub command: Option<Commands>,
30
31 #[arg(value_name = "PATH")]
33 pub path: Option<String>,
34}
35
36#[derive(Subcommand, Debug)]
37#[allow(clippy::large_enum_variant)]
38pub enum Commands {
39 #[command(visible_alias = "a")]
41 Attach {
42 name: String,
44 },
45
46 #[command(visible_alias = "i", visible_alias = "send")]
48 Input {
49 name: String,
51 text: Option<String>,
53 },
54
55 #[command(visible_alias = "s")]
57 Spawn {
58 name: Option<String>,
60
61 #[arg(long)]
63 workdir: Option<String>,
64
65 #[arg(long)]
67 ink: Option<String>,
68
69 #[arg(long)]
71 description: Option<String>,
72
73 #[arg(short, long)]
75 detach: bool,
76
77 #[arg(long)]
79 idle_threshold: Option<u32>,
80
81 #[arg(long)]
83 auto: bool,
84
85 #[arg(long)]
87 worktree: bool,
88
89 #[arg(long = "worktree-base")]
91 worktree_base: Option<String>,
92
93 #[arg(long)]
95 runtime: Option<String>,
96
97 #[arg(long)]
99 secret: Vec<String>,
100
101 #[arg(last = true)]
103 command: Vec<String>,
104 },
105
106 #[command(visible_alias = "ls")]
108 List {
109 #[arg(short, long)]
111 all: bool,
112 },
113
114 #[command(visible_alias = "l")]
116 Logs {
117 name: String,
119
120 #[arg(long, default_value = "100")]
122 lines: usize,
123
124 #[arg(short, long)]
126 follow: bool,
127 },
128
129 #[command(visible_alias = "k", alias = "kill")]
131 Stop {
132 #[arg(required = true)]
134 names: Vec<String>,
135
136 #[arg(long, short = 'p')]
138 purge: bool,
139 },
140
141 Cleanup,
143
144 #[command(visible_alias = "r")]
146 Resume {
147 name: String,
149 },
150
151 #[command(visible_alias = "n")]
153 Nodes,
154
155 #[command(visible_alias = "iv")]
157 Interventions {
158 name: String,
160 },
161
162 Ui,
164
165 #[command(visible_alias = "sched")]
167 Schedule {
168 #[command(subcommand)]
169 action: ScheduleAction,
170 },
171
172 #[command(visible_alias = "sec")]
174 Secret {
175 #[command(subcommand)]
176 action: SecretAction,
177 },
178
179 #[command(visible_alias = "wt")]
181 Worktree {
182 #[command(subcommand)]
183 action: WorktreeAction,
184 },
185}
186
187#[derive(Subcommand, Debug)]
188pub enum SecretAction {
189 Set {
191 name: String,
193 value: String,
195 #[arg(long)]
197 env: Option<String>,
198 },
199 #[command(visible_alias = "ls")]
201 List,
202 #[command(visible_alias = "rm")]
204 Delete {
205 name: String,
207 },
208}
209
210#[derive(Subcommand, Debug)]
211pub enum WorktreeAction {
212 #[command(visible_alias = "ls")]
214 List,
215}
216
217#[derive(Subcommand, Debug)]
218pub enum ScheduleAction {
219 #[command(alias = "install")]
221 Add {
222 name: String,
224 cron: String,
226 #[arg(long)]
228 workdir: Option<String>,
229 #[arg(long)]
231 node: Option<String>,
232 #[arg(long)]
234 ink: Option<String>,
235 #[arg(long)]
237 description: Option<String>,
238 #[arg(last = true)]
240 command: Vec<String>,
241 },
242 #[command(alias = "ls")]
244 List,
245 #[command(alias = "rm")]
247 Remove {
248 name: String,
250 },
251 Pause {
253 name: String,
255 },
256 Resume {
258 name: String,
260 },
261}
262
263const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
265
266fn resolve_path(path: &str) -> String {
268 let p = std::path::Path::new(path);
269 if p.is_absolute() {
270 path.to_owned()
271 } else {
272 std::env::current_dir().map_or_else(
273 |_| path.to_owned(),
274 |cwd| cwd.join(p).to_string_lossy().into_owned(),
275 )
276 }
277}
278
279fn derive_session_name(path: &str) -> String {
281 let basename = std::path::Path::new(path)
282 .file_name()
283 .and_then(|n| n.to_str())
284 .unwrap_or("session");
285 let kebab: String = basename
287 .chars()
288 .map(|c| {
289 if c.is_ascii_alphanumeric() {
290 c.to_ascii_lowercase()
291 } else {
292 '-'
293 }
294 })
295 .collect();
296 let mut result = String::new();
298 for c in kebab.chars() {
299 if c == '-' && result.ends_with('-') {
300 continue;
301 }
302 result.push(c);
303 }
304 let result = result.trim_matches('-').to_owned();
305 if result.is_empty() {
306 "session".to_owned()
307 } else {
308 result
309 }
310}
311
312async fn deduplicate_session_name(
314 client: &reqwest::Client,
315 base: &str,
316 name: &str,
317 token: Option<&str>,
318) -> String {
319 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
321 .send()
322 .await;
323 match resp {
324 Ok(r) if r.status().is_success() => {
325 for i in 2..=99 {
327 let candidate = format!("{name}-{i}");
328 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
329 .send()
330 .await;
331 match resp {
332 Ok(r) if r.status().is_success() => {}
333 _ => return candidate,
334 }
335 }
336 format!("{name}-100")
337 }
338 _ => name.to_owned(),
339 }
340}
341
342pub fn base_url(node: &str) -> String {
344 format!("http://{node}")
345}
346
347#[derive(serde::Deserialize)]
349struct OutputResponse {
350 output: String,
351}
352
353fn format_branch(session: &Session) -> String {
357 let branch = session.git_branch.as_deref().unwrap_or("-").to_owned();
358
359 let mut suffix = String::new();
360 let ins = session.git_insertions.unwrap_or(0);
361 let del = session.git_deletions.unwrap_or(0);
362 if ins > 0 || del > 0 {
363 suffix = format!(" +{ins}/-{del}");
364 }
365 if let Some(ahead) = session.git_ahead
366 && ahead > 0
367 {
368 suffix = format!("{suffix} \u{2191}{ahead}");
369 }
370
371 format!("{branch}{suffix}")
372}
373
374fn format_name(session: &Session) -> String {
376 let mut name = session.name.clone();
377 if session.worktree_path.is_some() {
378 name = format!("{name} [wt]");
379 }
380 if session
381 .metadata
382 .as_ref()
383 .is_some_and(|m| m.contains_key("pr_url"))
384 {
385 name = format!("{name} [PR]");
386 }
387 if session
388 .metadata
389 .as_ref()
390 .is_some_and(|m| m.contains_key("error_status"))
391 {
392 name = format!("{name} [!]");
393 }
394 name
395}
396
397fn truncate(s: &str, max: usize) -> String {
399 if s.len() <= max {
400 s.to_owned()
401 } else {
402 let t: String = s.chars().take(max.saturating_sub(3)).collect();
403 format!("{t}...")
404 }
405}
406
407fn format_sessions(sessions: &[Session]) -> String {
408 if sessions.is_empty() {
409 return "No sessions.".into();
410 }
411
412 let rows: Vec<(String, String, String, String, String)> = sessions
414 .iter()
415 .map(|s| {
416 (
417 s.id.to_string()[..8].to_owned(),
418 format_name(s),
419 s.status.to_string(),
420 format_branch(s),
421 s.command.clone(),
422 )
423 })
424 .collect();
425
426 let w_id = 8;
427 let w_name = rows.iter().map(|r| r.1.len()).max().unwrap_or(4).max(4);
428 let w_status = 8;
429 let w_branch = rows.iter().map(|r| r.3.len()).max().unwrap_or(6).max(6);
430
431 let mut lines = vec![format!(
432 "{:<w_id$} {:<w_name$} {:<w_status$} {:<w_branch$} {}",
433 "ID", "NAME", "STATUS", "BRANCH", "COMMAND"
434 )];
435 for (id, name, status, branch, cmd) in &rows {
436 lines.push(format!(
437 "{:<w_id$} {:<w_name$} {:<w_status$} {:<w_branch$} {}",
438 id,
439 truncate(name, w_name),
440 status,
441 truncate(branch, w_branch),
442 truncate(cmd, 50)
443 ));
444 }
445 lines.join("\n")
446}
447
448fn format_nodes(resp: &PeersResponse) -> String {
450 let mut lines = vec![format!(
451 "{:<20} {:<25} {:<10} {}",
452 "NAME", "ADDRESS", "STATUS", "SESSIONS"
453 )];
454 lines.push(format!(
455 "{:<20} {:<25} {:<10} {}",
456 resp.local.name, "(local)", "online", "-"
457 ));
458 for p in &resp.peers {
459 let sessions = p
460 .session_count
461 .map_or_else(|| "-".into(), |c| c.to_string());
462 lines.push(format!(
463 "{:<20} {:<25} {:<10} {}",
464 p.name, p.address, p.status, sessions
465 ));
466 }
467 lines.join("\n")
468}
469
470fn format_interventions(events: &[InterventionEventResponse]) -> String {
472 if events.is_empty() {
473 return "No intervention events.".into();
474 }
475 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
476 for e in events {
477 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
478 }
479 lines.join("\n")
480}
481
482#[cfg_attr(coverage, allow(dead_code))]
484fn build_open_command(url: &str) -> std::process::Command {
485 #[cfg(target_os = "macos")]
486 {
487 let mut cmd = std::process::Command::new("open");
488 cmd.arg(url);
489 cmd
490 }
491 #[cfg(target_os = "linux")]
492 {
493 let mut cmd = std::process::Command::new("xdg-open");
494 cmd.arg(url);
495 cmd
496 }
497 #[cfg(target_os = "windows")]
498 {
499 let mut cmd = std::process::Command::new("cmd");
500 cmd.args(["/C", "start", url]);
501 cmd
502 }
503 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
504 {
505 let mut cmd = std::process::Command::new("xdg-open");
507 cmd.arg(url);
508 cmd
509 }
510}
511
512#[cfg(not(coverage))]
514fn open_browser(url: &str) -> Result<()> {
515 build_open_command(url).status()?;
516 Ok(())
517}
518
519#[cfg(coverage)]
521fn open_browser(_url: &str) -> Result<()> {
522 Ok(())
523}
524
525#[cfg_attr(coverage, allow(dead_code))]
528fn build_attach_command(backend_session_id: &str) -> std::process::Command {
529 if let Some(container) = backend_session_id.strip_prefix("docker:") {
531 let mut cmd = std::process::Command::new("docker");
532 cmd.args(["exec", "-it", container, "/bin/sh"]);
533 return cmd;
534 }
535 #[cfg(not(target_os = "windows"))]
537 {
538 let mut cmd = std::process::Command::new("tmux");
539 cmd.args(["attach-session", "-t", backend_session_id]);
540 cmd
541 }
542 #[cfg(target_os = "windows")]
543 {
544 let mut cmd = std::process::Command::new("cmd");
546 cmd.args([
547 "/C",
548 "echo",
549 "Attach not available on Windows. Use the web UI or --runtime docker.",
550 ]);
551 cmd
552 }
553}
554
555#[cfg(not(any(test, coverage, target_os = "windows")))]
557fn attach_session(backend_session_id: &str) -> Result<()> {
558 let status = build_attach_command(backend_session_id).status()?;
559 if !status.success() {
560 anyhow::bail!("attach failed with {status}");
561 }
562 Ok(())
563}
564
565#[cfg(all(target_os = "windows", not(test), not(coverage)))]
567fn attach_session(_backend_session_id: &str) -> Result<()> {
568 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
569 Ok(())
570}
571
572#[cfg(any(test, coverage))]
574#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
575fn attach_session(_backend_session_id: &str) -> Result<()> {
576 Ok(())
577}
578
579fn api_error(text: &str) -> anyhow::Error {
581 serde_json::from_str::<serde_json::Value>(text)
582 .ok()
583 .and_then(|v| v["error"].as_str().map(String::from))
584 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
585}
586
587async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
589 if resp.status().is_success() {
590 Ok(resp.text().await?)
591 } else {
592 let text = resp.text().await?;
593 Err(api_error(&text))
594 }
595}
596
597fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
599 if err.is_connect() {
600 anyhow::anyhow!(
601 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
602 )
603 } else {
604 anyhow::anyhow!("Network error connecting to {node}: {err}")
605 }
606}
607
608fn is_localhost(node: &str) -> bool {
610 let host = node.split(':').next().unwrap_or(node);
611 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
612}
613
614async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
616 let resp = client
617 .get(format!("{base}/api/v1/auth/token"))
618 .send()
619 .await
620 .ok()?;
621 let body: AuthTokenResponse = resp.json().await.ok()?;
622 if body.token.is_empty() {
623 None
624 } else {
625 Some(body.token)
626 }
627}
628
629async fn resolve_token(
631 client: &reqwest::Client,
632 base: &str,
633 node: &str,
634 explicit: Option<&str>,
635) -> Option<String> {
636 if let Some(t) = explicit {
637 return Some(t.to_owned());
638 }
639 if is_localhost(node) {
640 return discover_token(client, base).await;
641 }
642 None
643}
644
645fn node_needs_resolution(node: &str) -> bool {
647 !node.contains(':')
648}
649
650#[cfg(not(coverage))]
657async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
658 if !node_needs_resolution(node) {
660 return (node.to_owned(), None);
661 }
662
663 let local_base = "http://localhost:7433";
665 let mut resolved_address: Option<String> = None;
666
667 if let Ok(resp) = client
668 .get(format!("{local_base}/api/v1/peers"))
669 .send()
670 .await
671 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
672 {
673 for peer in &peers_resp.peers {
674 if peer.name == node {
675 resolved_address = Some(peer.address.clone());
676 break;
677 }
678 }
679 }
680
681 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
682
683 let peer_token = if let Ok(resp) = client
685 .get(format!("{local_base}/api/v1/config"))
686 .send()
687 .await
688 && let Ok(config) = resp.json::<ConfigResponse>().await
689 && let Some(entry) = config.peers.get(node)
690 {
691 entry.token().map(String::from)
692 } else {
693 None
694 };
695
696 (address, peer_token)
697}
698
699#[cfg(coverage)]
701async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
702 if node_needs_resolution(node) {
703 (format!("{node}:7433"), None)
704 } else {
705 (node.to_owned(), None)
706 }
707}
708
709fn authed_get(
711 client: &reqwest::Client,
712 url: String,
713 token: Option<&str>,
714) -> reqwest::RequestBuilder {
715 let req = client.get(url);
716 if let Some(t) = token {
717 req.bearer_auth(t)
718 } else {
719 req
720 }
721}
722
723fn authed_post(
725 client: &reqwest::Client,
726 url: String,
727 token: Option<&str>,
728) -> reqwest::RequestBuilder {
729 let req = client.post(url);
730 if let Some(t) = token {
731 req.bearer_auth(t)
732 } else {
733 req
734 }
735}
736
737fn authed_delete(
739 client: &reqwest::Client,
740 url: String,
741 token: Option<&str>,
742) -> reqwest::RequestBuilder {
743 let req = client.delete(url);
744 if let Some(t) = token {
745 req.bearer_auth(t)
746 } else {
747 req
748 }
749}
750
751#[cfg(not(coverage))]
753fn authed_put(
754 client: &reqwest::Client,
755 url: String,
756 token: Option<&str>,
757) -> reqwest::RequestBuilder {
758 let req = client.put(url);
759 if let Some(t) = token {
760 req.bearer_auth(t)
761 } else {
762 req
763 }
764}
765
766async fn fetch_output(
768 client: &reqwest::Client,
769 base: &str,
770 name: &str,
771 lines: usize,
772 token: Option<&str>,
773) -> Result<String> {
774 let resp = authed_get(
775 client,
776 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
777 token,
778 )
779 .send()
780 .await?;
781 let text = ok_or_api_error(resp).await?;
782 let output: OutputResponse = serde_json::from_str(&text)?;
783 Ok(output.output)
784}
785
786async fn fetch_session_status(
788 client: &reqwest::Client,
789 base: &str,
790 name: &str,
791 token: Option<&str>,
792) -> Result<String> {
793 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
794 .send()
795 .await?;
796 let text = ok_or_api_error(resp).await?;
797 let session: Session = serde_json::from_str(&text)?;
798 Ok(session.status.to_string())
799}
800
801async fn check_session_alive(
805 client: &reqwest::Client,
806 base: &str,
807 session_id: &str,
808 token: Option<&str>,
809) -> Result<()> {
810 for _ in 0..3 {
812 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
813 let resp = authed_get(
815 client,
816 format!("{base}/api/v1/sessions/{session_id}"),
817 token,
818 )
819 .send()
820 .await;
821 if let Ok(resp) = resp
822 && let Ok(text) = ok_or_api_error(resp).await
823 && let Ok(session) = serde_json::from_str::<Session>(&text)
824 {
825 match session.status.to_string().as_str() {
826 "creating" => continue,
827 "lost" | "stopped" => {
828 anyhow::bail!(
829 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
830 session.name,
831 session.name
832 );
833 }
834 _ => return Ok(()),
835 }
836 }
837 break;
839 }
840 Ok(())
841}
842
843fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
850 if prev.is_empty() {
851 return new;
852 }
853
854 let prev_lines: Vec<&str> = prev.lines().collect();
855 let new_lines: Vec<&str> = new.lines().collect();
856
857 if new_lines.is_empty() {
858 return "";
859 }
860
861 let last_prev = prev_lines[prev_lines.len() - 1];
863
864 for i in (0..new_lines.len()).rev() {
866 if new_lines[i] == last_prev {
867 let overlap_len = prev_lines.len().min(i + 1);
869 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
870 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
871 if prev_tail == new_overlap {
872 if i + 1 < new_lines.len() {
873 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
875 return new.get(consumed.min(new.len())..).unwrap_or("");
876 }
877 return "";
878 }
879 }
880 }
881
882 new
884}
885
886async fn follow_logs(
888 client: &reqwest::Client,
889 base: &str,
890 name: &str,
891 lines: usize,
892 token: Option<&str>,
893 writer: &mut (dyn std::io::Write + Send),
894) -> Result<()> {
895 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
896 write!(writer, "{prev_output}")?;
897
898 let mut unchanged_ticks: u32 = 0;
899
900 loop {
901 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
902
903 let new_output = fetch_output(client, base, name, lines, token).await?;
905
906 let diff = diff_output(&prev_output, &new_output);
907 if diff.is_empty() {
908 unchanged_ticks += 1;
909 } else {
910 write!(writer, "{diff}")?;
911 unchanged_ticks = 0;
912 }
913
914 if new_output.contains(AGENT_EXIT_MARKER) {
916 break;
917 }
918
919 prev_output = new_output;
920
921 if unchanged_ticks >= 3 {
923 let status = fetch_session_status(client, base, name, token).await?;
924 let is_terminal = status == "ready" || status == "stopped" || status == "lost";
925 if is_terminal {
926 break;
927 }
928 }
929 }
930 Ok(())
931}
932
933#[cfg(not(coverage))]
937async fn execute_schedule(
938 client: &reqwest::Client,
939 action: &ScheduleAction,
940 base: &str,
941 token: Option<&str>,
942) -> Result<String> {
943 match action {
944 ScheduleAction::Add {
945 name,
946 cron,
947 workdir,
948 node,
949 ink,
950 description,
951 command,
952 } => {
953 let cmd = if command.is_empty() {
954 None
955 } else {
956 Some(command.join(" "))
957 };
958 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
959 std::env::current_dir()
960 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
961 });
962 let mut body = serde_json::json!({
963 "name": name,
964 "cron": cron,
965 "workdir": resolved_workdir,
966 });
967 if let Some(c) = &cmd {
968 body["command"] = serde_json::json!(c);
969 }
970 if let Some(n) = node {
971 body["target_node"] = serde_json::json!(n);
972 }
973 if let Some(i) = ink {
974 body["ink"] = serde_json::json!(i);
975 }
976 if let Some(d) = description {
977 body["description"] = serde_json::json!(d);
978 }
979 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
980 .json(&body)
981 .send()
982 .await?;
983 ok_or_api_error(resp).await?;
984 Ok(format!("Created schedule \"{name}\""))
985 }
986 ScheduleAction::List => {
987 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
988 .send()
989 .await?;
990 let text = ok_or_api_error(resp).await?;
991 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
992 Ok(format_schedules(&schedules))
993 }
994 ScheduleAction::Remove { name } => {
995 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
996 .send()
997 .await?;
998 ok_or_api_error(resp).await?;
999 Ok(format!("Removed schedule \"{name}\""))
1000 }
1001 ScheduleAction::Pause { name } => {
1002 let body = serde_json::json!({ "enabled": false });
1003 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
1004 .json(&body)
1005 .send()
1006 .await?;
1007 ok_or_api_error(resp).await?;
1008 Ok(format!("Paused schedule \"{name}\""))
1009 }
1010 ScheduleAction::Resume { name } => {
1011 let body = serde_json::json!({ "enabled": true });
1012 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
1013 .json(&body)
1014 .send()
1015 .await?;
1016 ok_or_api_error(resp).await?;
1017 Ok(format!("Resumed schedule \"{name}\""))
1018 }
1019 }
1020}
1021
1022#[cfg(coverage)]
1024#[allow(clippy::unnecessary_wraps)]
1025async fn execute_schedule(
1026 _client: &reqwest::Client,
1027 _action: &ScheduleAction,
1028 _base: &str,
1029 _token: Option<&str>,
1030) -> Result<String> {
1031 Ok(String::new())
1032}
1033
1034#[cfg_attr(coverage, allow(dead_code))]
1038fn format_secrets(secrets: &[serde_json::Value]) -> String {
1039 if secrets.is_empty() {
1040 return "No secrets configured.".into();
1041 }
1042 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
1043 for s in secrets {
1044 let name = s["name"].as_str().unwrap_or("?");
1045 let env_display = s["env"]
1046 .as_str()
1047 .map_or_else(|| name.to_owned(), String::from);
1048 let created = s["created_at"]
1049 .as_str()
1050 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1051 lines.push(format!("{name:<24} {env_display:<24} {created}"));
1052 }
1053 lines.join("\n")
1054}
1055
1056#[cfg(not(coverage))]
1058async fn execute_secret(
1059 client: &reqwest::Client,
1060 action: &SecretAction,
1061 base: &str,
1062 token: Option<&str>,
1063) -> Result<String> {
1064 match action {
1065 SecretAction::Set { name, value, env } => {
1066 let mut body = serde_json::json!({ "value": value });
1067 if let Some(e) = env {
1068 body["env"] = serde_json::json!(e);
1069 }
1070 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1071 .json(&body)
1072 .send()
1073 .await?;
1074 ok_or_api_error(resp).await?;
1075 Ok(format!("Secret \"{name}\" set."))
1076 }
1077 SecretAction::List => {
1078 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1079 .send()
1080 .await?;
1081 let text = ok_or_api_error(resp).await?;
1082 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1083 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1084 Ok(format_secrets(secrets))
1085 }
1086 SecretAction::Delete { name } => {
1087 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1088 .send()
1089 .await?;
1090 ok_or_api_error(resp).await?;
1091 Ok(format!("Secret \"{name}\" deleted."))
1092 }
1093 }
1094}
1095
1096#[cfg(coverage)]
1098#[allow(clippy::unnecessary_wraps)]
1099async fn execute_secret(
1100 _client: &reqwest::Client,
1101 _action: &SecretAction,
1102 _base: &str,
1103 _token: Option<&str>,
1104) -> Result<String> {
1105 Ok(String::new())
1106}
1107
1108#[cfg(not(coverage))]
1110async fn execute_worktree(
1111 client: &reqwest::Client,
1112 action: &WorktreeAction,
1113 base: &str,
1114 token: Option<&str>,
1115) -> Result<String> {
1116 match action {
1117 WorktreeAction::List => {
1118 let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1119 .send()
1120 .await?;
1121 let text = ok_or_api_error(resp).await?;
1122 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1123 let wt_sessions: Vec<&Session> = sessions
1124 .iter()
1125 .filter(|s| s.worktree_path.is_some())
1126 .collect();
1127 Ok(format_worktree_sessions(&wt_sessions))
1128 }
1129 }
1130}
1131
1132#[cfg(coverage)]
1134#[allow(clippy::unnecessary_wraps)]
1135async fn execute_worktree(
1136 _client: &reqwest::Client,
1137 _action: &WorktreeAction,
1138 _base: &str,
1139 _token: Option<&str>,
1140) -> Result<String> {
1141 Ok(String::new())
1142}
1143
1144fn format_worktree_sessions(sessions: &[&Session]) -> String {
1146 if sessions.is_empty() {
1147 return "No worktree sessions.".into();
1148 }
1149 let mut lines = vec![format!(
1150 "{:<20} {:<20} {:<10} {}",
1151 "NAME", "BRANCH", "STATUS", "PATH"
1152 )];
1153 for s in sessions {
1154 let branch = s.worktree_branch.as_deref().unwrap_or("-");
1155 let path = s.worktree_path.as_deref().unwrap_or("-");
1156 lines.push(format!(
1157 "{:<20} {:<20} {:<10} {}",
1158 s.name, branch, s.status, path
1159 ));
1160 }
1161 lines.join("\n")
1162}
1163
1164#[cfg_attr(coverage, allow(dead_code))]
1166fn format_schedules(schedules: &[serde_json::Value]) -> String {
1167 if schedules.is_empty() {
1168 return "No schedules.".into();
1169 }
1170 let mut lines = vec![format!(
1171 "{:<20} {:<18} {:<8} {:<12} {}",
1172 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1173 )];
1174 for s in schedules {
1175 let name = s["name"].as_str().unwrap_or("?");
1176 let cron = s["cron"].as_str().unwrap_or("?");
1177 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1178 "yes"
1179 } else {
1180 "no"
1181 };
1182 let last_run = s["last_run_at"]
1183 .as_str()
1184 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1185 let node = s["target_node"].as_str().unwrap_or("local");
1186 lines.push(format!(
1187 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1188 ));
1189 }
1190 lines.join("\n")
1191}
1192
1193#[cfg(not(coverage))]
1197async fn select_best_node(
1198 client: &reqwest::Client,
1199 base: &str,
1200 token: Option<&str>,
1201) -> Result<(String, String)> {
1202 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1203 .send()
1204 .await?;
1205 let text = ok_or_api_error(resp).await?;
1206 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1207
1208 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1212 if peer.status != pulpo_common::peer::PeerStatus::Online {
1213 continue;
1214 }
1215 let sessions = peer.session_count.unwrap_or(0);
1216 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1217 #[allow(clippy::cast_precision_loss)]
1219 let score = sessions as f64 - (mem as f64 / 1024.0);
1220 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1221 best = Some((peer.address.clone(), peer.name.clone(), score));
1222 }
1223 }
1224
1225 match best {
1227 Some((addr, name, _)) => Ok((addr, name)),
1228 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1229 }
1230}
1231
1232#[cfg(coverage)]
1234#[allow(clippy::unnecessary_wraps)]
1235async fn select_best_node(
1236 _client: &reqwest::Client,
1237 _base: &str,
1238 _token: Option<&str>,
1239) -> Result<(String, String)> {
1240 Ok(("localhost:7433".into(), "local".into()))
1241}
1242
1243#[cfg(not(coverage))]
1246async fn ensure_daemon_running(client: &reqwest::Client, url: &str, node: &str) -> bool {
1247 if !is_localhost(node) {
1248 return true; }
1250 if client
1252 .get(format!("{url}/api/v1/health"))
1253 .timeout(std::time::Duration::from_secs(2))
1254 .send()
1255 .await
1256 .is_ok()
1257 {
1258 return true; }
1260
1261 eprintln!("pulpod is not running — starting it...");
1262
1263 let started = if cfg!(target_os = "macos") {
1265 std::process::Command::new("brew")
1266 .args(["services", "start", "pulpo"])
1267 .stdout(std::process::Stdio::null())
1268 .stderr(std::process::Stdio::null())
1269 .status()
1270 .is_ok_and(|s| s.success())
1271 } else {
1272 std::process::Command::new("systemctl")
1273 .args(["--user", "start", "pulpo"])
1274 .stdout(std::process::Stdio::null())
1275 .stderr(std::process::Stdio::null())
1276 .status()
1277 .is_ok_and(|s| s.success())
1278 };
1279
1280 if !started {
1281 if std::process::Command::new("pulpod")
1283 .stdout(std::process::Stdio::null())
1284 .stderr(std::process::Stdio::null())
1285 .spawn()
1286 .is_err()
1287 {
1288 eprintln!(
1289 "Failed to start pulpod. Install it with: brew install darioblanco/tap/pulpo"
1290 );
1291 return false;
1292 }
1293 }
1294
1295 for _ in 0..10 {
1297 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1298 if client
1299 .get(format!("{url}/api/v1/health"))
1300 .timeout(std::time::Duration::from_secs(1))
1301 .send()
1302 .await
1303 .is_ok()
1304 {
1305 eprintln!("pulpod started.");
1306 return true;
1307 }
1308 }
1309 eprintln!("pulpod did not start in time.");
1310 false
1311}
1312
1313#[cfg(coverage)]
1315async fn ensure_daemon_running(_client: &reqwest::Client, _url: &str, _node: &str) -> bool {
1316 true
1317}
1318
1319#[allow(clippy::too_many_lines)]
1321pub async fn execute(cli: &Cli) -> Result<String> {
1322 let client = reqwest::Client::new();
1323 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1324 let url = base_url(&resolved_node);
1325 let node = &resolved_node;
1326
1327 ensure_daemon_running(&client, &url, node).await;
1329
1330 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1331 .await
1332 .or(peer_token);
1333
1334 if cli.command.is_none() && cli.path.is_none() {
1336 use clap::CommandFactory;
1338 let mut cmd = Cli::command();
1339 cmd.print_help()?;
1340 println!();
1341 return Ok(String::new());
1342 }
1343 if cli.command.is_none() {
1344 let path = cli.path.as_deref().unwrap_or(".");
1345 let resolved_workdir = resolve_path(path);
1346 let base_name = derive_session_name(&resolved_workdir);
1347 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1348 let body = serde_json::json!({
1349 "name": name,
1350 "workdir": resolved_workdir,
1351 });
1352 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1353 .json(&body)
1354 .send()
1355 .await
1356 .map_err(|e| friendly_error(&e, node))?;
1357 let text = ok_or_api_error(resp).await?;
1358 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1359 let msg = format!(
1360 "Created session \"{}\" ({})",
1361 resp.session.name, resp.session.id
1362 );
1363 let backend_id = resp
1364 .session
1365 .backend_session_id
1366 .as_deref()
1367 .unwrap_or(&resp.session.name);
1368 eprintln!("{msg}");
1369 attach_session(backend_id)?;
1372 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1373 }
1374
1375 match cli.command.as_ref().unwrap() {
1376 Commands::Attach { name } => {
1377 let resp = authed_get(
1379 &client,
1380 format!("{url}/api/v1/sessions/{name}"),
1381 token.as_deref(),
1382 )
1383 .send()
1384 .await
1385 .map_err(|e| friendly_error(&e, node))?;
1386 let text = ok_or_api_error(resp).await?;
1387 let session: Session = serde_json::from_str(&text)?;
1388 match session.status.to_string().as_str() {
1389 "lost" => {
1390 anyhow::bail!(
1391 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1392 );
1393 }
1394 "stopped" => {
1395 anyhow::bail!(
1396 "Session \"{name}\" is {} — cannot attach to a stopped session.",
1397 session.status
1398 );
1399 }
1400 _ => {}
1401 }
1402 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1403 attach_session(&backend_id)?;
1404 Ok(format!("Detached from session {name}."))
1405 }
1406 Commands::Input { name, text } => {
1407 let input_text = text.as_deref().unwrap_or("\n");
1408 let body = serde_json::json!({ "text": input_text });
1409 let resp = authed_post(
1410 &client,
1411 format!("{url}/api/v1/sessions/{name}/input"),
1412 token.as_deref(),
1413 )
1414 .json(&body)
1415 .send()
1416 .await
1417 .map_err(|e| friendly_error(&e, node))?;
1418 ok_or_api_error(resp).await?;
1419 Ok(format!("Sent input to session {name}."))
1420 }
1421 Commands::List { all } => {
1422 let list_url = if *all {
1423 format!("{url}/api/v1/sessions")
1424 } else {
1425 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1426 };
1427 let resp = authed_get(&client, list_url, token.as_deref())
1428 .send()
1429 .await
1430 .map_err(|e| friendly_error(&e, node))?;
1431 let text = ok_or_api_error(resp).await?;
1432 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1433 Ok(format_sessions(&sessions))
1434 }
1435 Commands::Nodes => {
1436 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1437 .send()
1438 .await
1439 .map_err(|e| friendly_error(&e, node))?;
1440 let text = ok_or_api_error(resp).await?;
1441 let resp: PeersResponse = serde_json::from_str(&text)?;
1442 Ok(format_nodes(&resp))
1443 }
1444 Commands::Spawn {
1445 workdir,
1446 name,
1447 ink,
1448 description,
1449 detach,
1450 idle_threshold,
1451 auto,
1452 worktree,
1453 worktree_base,
1454 runtime,
1455 secret,
1456 command,
1457 } => {
1458 let cmd = if command.is_empty() {
1459 None
1460 } else {
1461 Some(command.join(" "))
1462 };
1463 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1465 std::env::current_dir()
1466 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1467 });
1468 let resolved_name = if let Some(n) = name {
1470 n.clone()
1471 } else {
1472 let base_name = derive_session_name(&resolved_workdir);
1473 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1474 };
1475 let mut body = serde_json::json!({
1476 "name": resolved_name,
1477 "workdir": resolved_workdir,
1478 });
1479 if let Some(c) = &cmd {
1480 body["command"] = serde_json::json!(c);
1481 }
1482 if let Some(i) = ink {
1483 body["ink"] = serde_json::json!(i);
1484 }
1485 if let Some(d) = description {
1486 body["description"] = serde_json::json!(d);
1487 }
1488 if let Some(t) = idle_threshold {
1489 body["idle_threshold_secs"] = serde_json::json!(t);
1490 }
1491 if *worktree || worktree_base.is_some() {
1493 body["worktree"] = serde_json::json!(true);
1494 if let Some(base) = worktree_base {
1495 body["worktree_base"] = serde_json::json!(base);
1496 eprintln!(
1497 "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1498 );
1499 } else {
1500 eprintln!(
1501 "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1502 );
1503 }
1504 }
1505 if let Some(rt) = runtime {
1506 body["runtime"] = serde_json::json!(rt);
1507 }
1508 if !secret.is_empty() {
1509 body["secrets"] = serde_json::json!(secret);
1510 }
1511 let spawn_url = if *auto {
1512 let (auto_addr, auto_name) =
1513 select_best_node(&client, &url, token.as_deref()).await?;
1514 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1515 base_url(&auto_addr)
1516 } else {
1517 url.clone()
1518 };
1519 let resp = authed_post(
1520 &client,
1521 format!("{spawn_url}/api/v1/sessions"),
1522 token.as_deref(),
1523 )
1524 .json(&body)
1525 .send()
1526 .await
1527 .map_err(|e| friendly_error(&e, node))?;
1528 let text = ok_or_api_error(resp).await?;
1529 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1530 let msg = format!(
1531 "Created session \"{}\" ({})",
1532 resp.session.name, resp.session.id
1533 );
1534 if !detach {
1535 let backend_id = resp
1536 .session
1537 .backend_session_id
1538 .as_deref()
1539 .unwrap_or(&resp.session.name);
1540 eprintln!("{msg}");
1541 if cmd.is_some() {
1544 let sid = resp.session.id.to_string();
1545 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1546 }
1547 attach_session(backend_id)?;
1548 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1549 }
1550 Ok(msg)
1551 }
1552 Commands::Stop { names, purge } => {
1553 let mut results = Vec::new();
1554 for name in names {
1555 let query = if *purge { "?purge=true" } else { "" };
1556 let resp = authed_post(
1557 &client,
1558 format!("{url}/api/v1/sessions/{name}/stop{query}"),
1559 token.as_deref(),
1560 )
1561 .send()
1562 .await
1563 .map_err(|e| friendly_error(&e, node))?;
1564 let action = if *purge {
1565 "stopped and purged"
1566 } else {
1567 "stopped"
1568 };
1569 match ok_or_api_error(resp).await {
1570 Ok(_) => results.push(format!("Session {name} {action}.")),
1571 Err(e) => results.push(format!("Error stopping {name}: {e}")),
1572 }
1573 }
1574 Ok(results.join("\n"))
1575 }
1576 Commands::Cleanup => {
1577 let resp = authed_post(
1578 &client,
1579 format!("{url}/api/v1/sessions/cleanup"),
1580 token.as_deref(),
1581 )
1582 .send()
1583 .await
1584 .map_err(|e| friendly_error(&e, node))?;
1585 let text = ok_or_api_error(resp).await?;
1586 let result: serde_json::Value = serde_json::from_str(&text)?;
1587 let count = result["deleted"].as_u64().unwrap_or(0);
1588 if count == 0 {
1589 Ok("No stopped or lost sessions to clean up.".into())
1590 } else {
1591 Ok(format!("Cleaned up {count} session(s)."))
1592 }
1593 }
1594 Commands::Logs {
1595 name,
1596 lines,
1597 follow,
1598 } => {
1599 if *follow {
1600 let mut stdout = std::io::stdout();
1601 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1602 .await
1603 .map_err(|e| {
1604 match e.downcast::<reqwest::Error>() {
1606 Ok(re) => friendly_error(&re, node),
1607 Err(other) => other,
1608 }
1609 })?;
1610 Ok(String::new())
1611 } else {
1612 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1613 .await
1614 .map_err(|e| match e.downcast::<reqwest::Error>() {
1615 Ok(re) => friendly_error(&re, node),
1616 Err(other) => other,
1617 })?;
1618 Ok(output)
1619 }
1620 }
1621 Commands::Interventions { name } => {
1622 let resp = authed_get(
1623 &client,
1624 format!("{url}/api/v1/sessions/{name}/interventions"),
1625 token.as_deref(),
1626 )
1627 .send()
1628 .await
1629 .map_err(|e| friendly_error(&e, node))?;
1630 let text = ok_or_api_error(resp).await?;
1631 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1632 Ok(format_interventions(&events))
1633 }
1634 Commands::Ui => {
1635 let dashboard = base_url(node);
1636 open_browser(&dashboard)?;
1637 Ok(format!("Opening {dashboard}"))
1638 }
1639 Commands::Resume { name } => {
1640 let resp = authed_post(
1641 &client,
1642 format!("{url}/api/v1/sessions/{name}/resume"),
1643 token.as_deref(),
1644 )
1645 .send()
1646 .await
1647 .map_err(|e| friendly_error(&e, node))?;
1648 let text = ok_or_api_error(resp).await?;
1649 let session: Session = serde_json::from_str(&text)?;
1650 let backend_id = session
1651 .backend_session_id
1652 .as_deref()
1653 .unwrap_or(&session.name);
1654 eprintln!("Resumed session \"{}\"", session.name);
1655 let sid = session.id.to_string();
1656 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1657 attach_session(backend_id)?;
1658 Ok(format!("Detached from session \"{}\".", session.name))
1659 }
1660 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1661 .await
1662 .map_err(|e| match e.downcast::<reqwest::Error>() {
1663 Ok(re) => friendly_error(&re, node),
1664 Err(other) => other,
1665 }),
1666 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1667 .await
1668 .map_err(|e| match e.downcast::<reqwest::Error>() {
1669 Ok(re) => friendly_error(&re, node),
1670 Err(other) => other,
1671 }),
1672 Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1673 .await
1674 .map_err(|e| match e.downcast::<reqwest::Error>() {
1675 Ok(re) => friendly_error(&re, node),
1676 Err(other) => other,
1677 }),
1678 }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683 use super::*;
1684
1685 fn repo_session(workdir: &str, branch: Option<&str>) -> Session {
1687 Session {
1688 id: uuid::Uuid::nil(),
1689 name: "test".into(),
1690 workdir: workdir.into(),
1691 command: "echo".into(),
1692 description: None,
1693 status: pulpo_common::session::SessionStatus::Active,
1694 exit_code: None,
1695 backend_session_id: None,
1696 output_snapshot: None,
1697 metadata: None,
1698 ink: None,
1699 intervention_code: None,
1700 intervention_reason: None,
1701 intervention_at: None,
1702 last_output_at: None,
1703 idle_since: None,
1704 idle_threshold_secs: None,
1705 worktree_path: None,
1706 worktree_branch: None,
1707 git_branch: branch.map(Into::into),
1708 git_commit: None,
1709 git_files_changed: None,
1710 git_insertions: None,
1711 git_deletions: None,
1712 git_ahead: None,
1713 runtime: Runtime::Tmux,
1714 created_at: chrono::Utc::now(),
1715 updated_at: chrono::Utc::now(),
1716 }
1717 }
1718
1719 #[test]
1720 fn test_base_url() {
1721 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1722 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1723 }
1724
1725 #[test]
1726 fn test_cli_parse_list() {
1727 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1728 assert_eq!(cli.node, "localhost:7433");
1729 assert!(matches!(cli.command, Some(Commands::List { .. })));
1730 }
1731
1732 #[test]
1733 fn test_cli_parse_nodes() {
1734 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1735 assert!(matches!(cli.command, Some(Commands::Nodes)));
1736 }
1737
1738 #[test]
1739 fn test_cli_parse_ui() {
1740 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1741 assert!(matches!(cli.command, Some(Commands::Ui)));
1742 }
1743
1744 #[test]
1745 fn test_cli_parse_ui_custom_node() {
1746 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1747 assert_eq!(cli.node, "mac-mini:7433");
1749 assert_eq!(cli.path.as_deref(), Some("ui"));
1750 }
1751
1752 #[test]
1753 fn test_build_open_command() {
1754 let cmd = build_open_command("http://localhost:7433");
1755 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1756 assert_eq!(args, vec!["http://localhost:7433"]);
1757 #[cfg(target_os = "macos")]
1758 assert_eq!(cmd.get_program(), "open");
1759 #[cfg(target_os = "linux")]
1760 assert_eq!(cmd.get_program(), "xdg-open");
1761 }
1762
1763 #[test]
1764 fn test_cli_parse_spawn() {
1765 let cli = Cli::try_parse_from([
1766 "pulpo",
1767 "spawn",
1768 "my-task",
1769 "--workdir",
1770 "/tmp/repo",
1771 "--",
1772 "claude",
1773 "-p",
1774 "Fix the bug",
1775 ])
1776 .unwrap();
1777 assert!(matches!(
1778 &cli.command,
1779 Some(Commands::Spawn { name, workdir, command, .. })
1780 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1781 && command == &["claude", "-p", "Fix the bug"]
1782 ));
1783 }
1784
1785 #[test]
1786 fn test_cli_parse_spawn_with_ink() {
1787 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1788 assert!(matches!(
1789 &cli.command,
1790 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1791 ));
1792 }
1793
1794 #[test]
1795 fn test_cli_parse_spawn_with_description() {
1796 let cli =
1797 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1798 .unwrap();
1799 assert!(matches!(
1800 &cli.command,
1801 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1802 ));
1803 }
1804
1805 #[test]
1806 fn test_cli_parse_spawn_name_positional() {
1807 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1808 assert!(matches!(
1809 &cli.command,
1810 Some(Commands::Spawn { name, command, .. })
1811 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1812 ));
1813 }
1814
1815 #[test]
1816 fn test_cli_parse_spawn_no_command() {
1817 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1818 assert!(matches!(
1819 &cli.command,
1820 Some(Commands::Spawn { command, .. }) if command.is_empty()
1821 ));
1822 }
1823
1824 #[test]
1825 fn test_cli_parse_spawn_idle_threshold() {
1826 let cli =
1827 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1828 assert!(matches!(
1829 &cli.command,
1830 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1831 ));
1832 }
1833
1834 #[test]
1835 fn test_cli_parse_spawn_idle_threshold_60() {
1836 let cli =
1837 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1838 assert!(matches!(
1839 &cli.command,
1840 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1841 ));
1842 }
1843
1844 #[test]
1845 fn test_cli_parse_spawn_secrets() {
1846 let cli = Cli::try_parse_from([
1847 "pulpo",
1848 "spawn",
1849 "my-task",
1850 "--secret",
1851 "GITHUB_TOKEN",
1852 "--secret",
1853 "NPM_TOKEN",
1854 ])
1855 .unwrap();
1856 assert!(matches!(
1857 &cli.command,
1858 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1859 ));
1860 }
1861
1862 #[test]
1863 fn test_cli_parse_spawn_no_secrets() {
1864 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1865 assert!(matches!(
1866 &cli.command,
1867 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1868 ));
1869 }
1870
1871 #[test]
1872 fn test_cli_parse_secret_set_with_env() {
1873 let cli = Cli::try_parse_from([
1874 "pulpo",
1875 "secret",
1876 "set",
1877 "GH_WORK",
1878 "token123",
1879 "--env",
1880 "GITHUB_TOKEN",
1881 ])
1882 .unwrap();
1883 assert!(matches!(
1884 &cli.command,
1885 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1886 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1887 ));
1888 }
1889
1890 #[test]
1891 fn test_cli_parse_secret_set_without_env() {
1892 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1893 assert!(matches!(
1894 &cli.command,
1895 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1896 if name == "MY_KEY" && value == "val" && env.is_none()
1897 ));
1898 }
1899
1900 #[test]
1901 fn test_cli_parse_spawn_worktree() {
1902 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1903 assert!(matches!(
1904 &cli.command,
1905 Some(Commands::Spawn { worktree, .. }) if *worktree
1906 ));
1907 }
1908
1909 #[test]
1910 fn test_cli_parse_spawn_worktree_base() {
1911 let cli = Cli::try_parse_from([
1912 "pulpo",
1913 "spawn",
1914 "my-task",
1915 "--worktree",
1916 "--worktree-base",
1917 "main",
1918 ])
1919 .unwrap();
1920 assert!(matches!(
1921 &cli.command,
1922 Some(Commands::Spawn { worktree, worktree_base, .. })
1923 if *worktree && worktree_base.as_deref() == Some("main")
1924 ));
1925 }
1926
1927 #[test]
1928 fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1929 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1931 .unwrap();
1932 assert!(matches!(
1933 &cli.command,
1934 Some(Commands::Spawn { worktree_base, .. })
1935 if worktree_base.as_deref() == Some("develop")
1936 ));
1937 }
1938
1939 #[test]
1940 fn test_cli_parse_worktree_list() {
1941 let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1942 assert!(matches!(
1943 &cli.command,
1944 Some(Commands::Worktree {
1945 action: WorktreeAction::List
1946 })
1947 ));
1948 }
1949
1950 #[test]
1951 fn test_cli_parse_wt_alias() {
1952 let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1953 assert!(matches!(
1954 &cli.command,
1955 Some(Commands::Worktree {
1956 action: WorktreeAction::List
1957 })
1958 ));
1959 }
1960
1961 #[test]
1962 fn test_cli_parse_worktree_list_ls_alias() {
1963 let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1964 assert!(matches!(
1965 &cli.command,
1966 Some(Commands::Worktree {
1967 action: WorktreeAction::List
1968 })
1969 ));
1970 }
1971
1972 #[test]
1973 fn test_format_worktree_sessions_empty() {
1974 let output = format_worktree_sessions(&[]);
1975 assert_eq!(output, "No worktree sessions.");
1976 }
1977
1978 #[test]
1979 fn test_format_worktree_sessions_with_data() {
1980 use chrono::Utc;
1981 use pulpo_common::session::SessionStatus;
1982 use uuid::Uuid;
1983
1984 let session = Session {
1985 id: Uuid::nil(),
1986 name: "fix-auth".into(),
1987 workdir: "/tmp/repo".into(),
1988 command: "claude -p 'fix auth'".into(),
1989 description: None,
1990 status: SessionStatus::Active,
1991 exit_code: None,
1992 backend_session_id: None,
1993 output_snapshot: None,
1994 metadata: None,
1995 ink: None,
1996 intervention_code: None,
1997 intervention_reason: None,
1998 intervention_at: None,
1999 last_output_at: None,
2000 idle_since: None,
2001 idle_threshold_secs: None,
2002 worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
2003 worktree_branch: Some("fix-auth".into()),
2004 git_branch: None,
2005 git_commit: None,
2006 git_files_changed: None,
2007 git_insertions: None,
2008 git_deletions: None,
2009 git_ahead: None,
2010 runtime: Runtime::Tmux,
2011 created_at: Utc::now(),
2012 updated_at: Utc::now(),
2013 };
2014 let sessions = vec![&session];
2015 let output = format_worktree_sessions(&sessions);
2016 assert!(output.contains("fix-auth"), "should show name: {output}");
2017 assert!(output.contains("active"), "should show status: {output}");
2018 assert!(
2019 output.contains("/home/user/.pulpo/worktrees/fix-auth"),
2020 "should show path: {output}"
2021 );
2022 assert!(output.contains("BRANCH"), "should have header: {output}");
2023 }
2024
2025 #[test]
2026 fn test_format_worktree_sessions_no_branch() {
2027 use chrono::Utc;
2028 use pulpo_common::session::SessionStatus;
2029 use uuid::Uuid;
2030
2031 let session = Session {
2032 id: Uuid::nil(),
2033 name: "old-session".into(),
2034 workdir: "/tmp".into(),
2035 command: "echo".into(),
2036 description: None,
2037 status: SessionStatus::Active,
2038 exit_code: None,
2039 backend_session_id: None,
2040 output_snapshot: None,
2041 metadata: None,
2042 ink: None,
2043 intervention_code: None,
2044 intervention_reason: None,
2045 intervention_at: None,
2046 last_output_at: None,
2047 idle_since: None,
2048 idle_threshold_secs: None,
2049 worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
2050 worktree_branch: None,
2051 git_branch: None,
2052 git_commit: None,
2053 git_files_changed: None,
2054 git_insertions: None,
2055 git_deletions: None,
2056 git_ahead: None,
2057 runtime: Runtime::Tmux,
2058 created_at: Utc::now(),
2059 updated_at: Utc::now(),
2060 };
2061 let sessions = vec![&session];
2062 let output = format_worktree_sessions(&sessions);
2063 assert!(
2064 output.contains('-'),
2065 "branch should show dash when None: {output}"
2066 );
2067 }
2068
2069 #[test]
2070 fn test_cli_parse_spawn_detach() {
2071 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
2072 assert!(matches!(
2073 &cli.command,
2074 Some(Commands::Spawn { detach, .. }) if *detach
2075 ));
2076 }
2077
2078 #[test]
2079 fn test_cli_parse_spawn_detach_short() {
2080 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
2081 assert!(matches!(
2082 &cli.command,
2083 Some(Commands::Spawn { detach, .. }) if *detach
2084 ));
2085 }
2086
2087 #[test]
2088 fn test_cli_parse_spawn_detach_default() {
2089 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
2090 assert!(matches!(
2091 &cli.command,
2092 Some(Commands::Spawn { detach, .. }) if !detach
2093 ));
2094 }
2095
2096 #[test]
2097 fn test_cli_parse_logs() {
2098 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
2099 assert!(matches!(
2100 &cli.command,
2101 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
2102 ));
2103 }
2104
2105 #[test]
2106 fn test_cli_parse_logs_with_lines() {
2107 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
2108 assert!(matches!(
2109 &cli.command,
2110 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
2111 ));
2112 }
2113
2114 #[test]
2115 fn test_cli_parse_logs_follow() {
2116 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
2117 assert!(matches!(
2118 &cli.command,
2119 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2120 ));
2121 }
2122
2123 #[test]
2124 fn test_cli_parse_logs_follow_short() {
2125 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
2126 assert!(matches!(
2127 &cli.command,
2128 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2129 ));
2130 }
2131
2132 #[test]
2133 fn test_cli_parse_stop() {
2134 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
2135 assert!(matches!(
2136 &cli.command,
2137 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2138 ));
2139 }
2140
2141 #[test]
2142 fn test_cli_parse_stop_purge() {
2143 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
2144 assert!(matches!(
2145 &cli.command,
2146 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2147 ));
2148
2149 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
2150 assert!(matches!(
2151 &cli.command,
2152 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2153 ));
2154 }
2155
2156 #[test]
2157 fn test_cli_parse_kill_alias() {
2158 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
2159 assert!(matches!(
2160 &cli.command,
2161 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2162 ));
2163 }
2164
2165 #[test]
2166 fn test_cli_parse_resume() {
2167 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
2168 assert!(matches!(
2169 &cli.command,
2170 Some(Commands::Resume { name }) if name == "my-session"
2171 ));
2172 }
2173
2174 #[test]
2175 fn test_cli_parse_input() {
2176 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2177 assert!(matches!(
2178 &cli.command,
2179 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2180 ));
2181 }
2182
2183 #[test]
2184 fn test_cli_parse_input_no_text() {
2185 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2186 assert!(matches!(
2187 &cli.command,
2188 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2189 ));
2190 }
2191
2192 #[test]
2193 fn test_cli_parse_input_alias() {
2194 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2195 assert!(matches!(
2196 &cli.command,
2197 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2198 ));
2199 }
2200
2201 #[test]
2202 fn test_cli_parse_custom_node() {
2203 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2204 assert_eq!(cli.node, "win-pc:8080");
2205 assert_eq!(cli.path.as_deref(), Some("list"));
2207 }
2208
2209 #[test]
2210 fn test_cli_version() {
2211 let result = Cli::try_parse_from(["pulpo", "--version"]);
2212 let err = result.unwrap_err();
2214 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2215 }
2216
2217 #[test]
2218 fn test_cli_parse_no_subcommand_succeeds() {
2219 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2220 assert!(cli.command.is_none());
2221 assert!(cli.path.is_none());
2222 }
2223
2224 #[test]
2225 fn test_cli_debug() {
2226 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2227 let debug = format!("{cli:?}");
2228 assert!(debug.contains("List"));
2229 }
2230
2231 #[test]
2232 fn test_commands_debug() {
2233 let cmd = Commands::List { all: false };
2234 assert_eq!(format!("{cmd:?}"), "List { all: false }");
2235 }
2236
2237 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2239
2240 fn test_create_response_json() -> String {
2242 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2243 }
2244
2245 async fn start_test_server() -> String {
2247 use axum::http::StatusCode;
2248 use axum::{
2249 Json, Router,
2250 routing::{get, post},
2251 };
2252
2253 let create_json = test_create_response_json();
2254
2255 let app = Router::new()
2256 .route(
2257 "/api/v1/sessions",
2258 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2259 (StatusCode::CREATED, create_json.clone())
2260 }),
2261 )
2262 .route(
2263 "/api/v1/sessions/{id}",
2264 get(|| async { TEST_SESSION_JSON.to_owned() }),
2265 )
2266 .route(
2267 "/api/v1/sessions/{id}/stop",
2268 post(|| async { StatusCode::NO_CONTENT }),
2269 )
2270 .route(
2271 "/api/v1/sessions/{id}/output",
2272 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2273 )
2274 .route(
2275 "/api/v1/peers",
2276 get(|| async {
2277 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2278 }),
2279 )
2280 .route(
2281 "/api/v1/sessions/{id}/resume",
2282 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2283 )
2284 .route(
2285 "/api/v1/sessions/{id}/interventions",
2286 get(|| async { "[]".to_owned() }),
2287 )
2288 .route(
2289 "/api/v1/sessions/{id}/input",
2290 post(|| async { StatusCode::NO_CONTENT }),
2291 )
2292 .route(
2293 "/api/v1/schedules",
2294 get(|| async { Json::<Vec<()>>(vec![]) })
2295 .post(|| async { StatusCode::CREATED }),
2296 )
2297 .route(
2298 "/api/v1/schedules/{id}",
2299 axum::routing::put(|| async { StatusCode::OK })
2300 .delete(|| async { StatusCode::NO_CONTENT }),
2301 );
2302
2303 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2304 let addr = listener.local_addr().unwrap();
2305 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2306 format!("127.0.0.1:{}", addr.port())
2307 }
2308
2309 #[tokio::test]
2310 async fn test_execute_list_success() {
2311 let node = start_test_server().await;
2312 let cli = Cli {
2313 node,
2314 token: None,
2315 command: Some(Commands::List { all: false }),
2316 path: None,
2317 };
2318 let result = execute(&cli).await.unwrap();
2319 assert_eq!(result, "No sessions.");
2320 }
2321
2322 #[tokio::test]
2323 async fn test_execute_nodes_success() {
2324 let node = start_test_server().await;
2325 let cli = Cli {
2326 node,
2327 token: None,
2328 command: Some(Commands::Nodes),
2329 path: None,
2330 };
2331 let result = execute(&cli).await.unwrap();
2332 assert!(result.contains("test"));
2333 assert!(result.contains("(local)"));
2334 assert!(result.contains("NAME"));
2335 }
2336
2337 #[tokio::test]
2338 async fn test_execute_spawn_success() {
2339 let node = start_test_server().await;
2340 let cli = Cli {
2341 node,
2342 token: None,
2343 command: Some(Commands::Spawn {
2344 name: Some("test".into()),
2345 workdir: Some("/tmp/repo".into()),
2346 ink: None,
2347 description: None,
2348 detach: true,
2349 idle_threshold: None,
2350 auto: false,
2351 worktree: false,
2352 worktree_base: None,
2353 runtime: None,
2354 secret: vec![],
2355 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2356 }),
2357 path: None,
2358 };
2359 let result = execute(&cli).await.unwrap();
2360 assert!(result.contains("Created session"));
2361 assert!(result.contains("repo"));
2362 }
2363
2364 #[tokio::test]
2365 async fn test_execute_spawn_with_all_flags() {
2366 let node = start_test_server().await;
2367 let cli = Cli {
2368 node,
2369 token: None,
2370 command: Some(Commands::Spawn {
2371 name: Some("test".into()),
2372 workdir: Some("/tmp/repo".into()),
2373 ink: Some("coder".into()),
2374 description: Some("Fix the bug".into()),
2375 detach: true,
2376 idle_threshold: None,
2377 auto: false,
2378 worktree: false,
2379 worktree_base: None,
2380 runtime: None,
2381 secret: vec![],
2382 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2383 }),
2384 path: None,
2385 };
2386 let result = execute(&cli).await.unwrap();
2387 assert!(result.contains("Created session"));
2388 }
2389
2390 #[tokio::test]
2391 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2392 let node = start_test_server().await;
2393 let cli = Cli {
2394 node,
2395 token: None,
2396 command: Some(Commands::Spawn {
2397 name: Some("full-opts".into()),
2398 workdir: Some("/tmp/repo".into()),
2399 ink: Some("coder".into()),
2400 description: Some("Full options".into()),
2401 detach: true,
2402 idle_threshold: Some(120),
2403 auto: false,
2404 worktree: true,
2405 worktree_base: None,
2406 runtime: Some("docker".into()),
2407 secret: vec![],
2408 command: vec!["claude".into()],
2409 }),
2410 path: None,
2411 };
2412 let result = execute(&cli).await.unwrap();
2413 assert!(result.contains("Created session"));
2414 }
2415
2416 #[tokio::test]
2417 async fn test_execute_spawn_no_name_derives_from_workdir() {
2418 let node = start_test_server().await;
2419 let cli = Cli {
2420 node,
2421 token: None,
2422 command: Some(Commands::Spawn {
2423 name: None,
2424 workdir: Some("/tmp/my-project".into()),
2425 ink: None,
2426 description: None,
2427 detach: true,
2428 idle_threshold: None,
2429 auto: false,
2430 worktree: false,
2431 worktree_base: None,
2432 runtime: None,
2433 secret: vec![],
2434 command: vec!["echo".into(), "hello".into()],
2435 }),
2436 path: None,
2437 };
2438 let result = execute(&cli).await.unwrap();
2439 assert!(result.contains("Created session"));
2440 }
2441
2442 #[tokio::test]
2443 async fn test_execute_spawn_no_command() {
2444 let node = start_test_server().await;
2445 let cli = Cli {
2446 node,
2447 token: None,
2448 command: Some(Commands::Spawn {
2449 name: Some("test".into()),
2450 workdir: Some("/tmp/repo".into()),
2451 ink: None,
2452 description: None,
2453 detach: true,
2454 idle_threshold: None,
2455 auto: false,
2456 worktree: false,
2457 worktree_base: None,
2458 runtime: None,
2459 secret: vec![],
2460 command: vec![],
2461 }),
2462 path: None,
2463 };
2464 let result = execute(&cli).await.unwrap();
2465 assert!(result.contains("Created session"));
2466 }
2467
2468 #[tokio::test]
2469 async fn test_execute_spawn_with_name() {
2470 let node = start_test_server().await;
2471 let cli = Cli {
2472 node,
2473 token: None,
2474 command: Some(Commands::Spawn {
2475 name: Some("my-task".into()),
2476 workdir: Some("/tmp/repo".into()),
2477 ink: None,
2478 description: None,
2479 detach: true,
2480 idle_threshold: None,
2481 auto: false,
2482 worktree: false,
2483 worktree_base: None,
2484 runtime: None,
2485 secret: vec![],
2486 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2487 }),
2488 path: None,
2489 };
2490 let result = execute(&cli).await.unwrap();
2491 assert!(result.contains("Created session"));
2492 }
2493
2494 #[tokio::test]
2495 async fn test_execute_spawn_auto_attach() {
2496 let node = start_test_server().await;
2497 let cli = Cli {
2498 node,
2499 token: None,
2500 command: Some(Commands::Spawn {
2501 name: Some("test".into()),
2502 workdir: Some("/tmp/repo".into()),
2503 ink: None,
2504 description: None,
2505 detach: false,
2506 idle_threshold: None,
2507 auto: false,
2508 worktree: false,
2509 worktree_base: None,
2510 runtime: None,
2511 secret: vec![],
2512 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2513 }),
2514 path: None,
2515 };
2516 let result = execute(&cli).await.unwrap();
2517 assert!(result.contains("Detached from session"));
2519 }
2520
2521 #[tokio::test]
2522 async fn test_execute_stop_success() {
2523 let node = start_test_server().await;
2524 let cli = Cli {
2525 node,
2526 token: None,
2527 command: Some(Commands::Stop {
2528 names: vec!["test-session".into()],
2529 purge: false,
2530 }),
2531 path: None,
2532 };
2533 let result = execute(&cli).await.unwrap();
2534 assert!(result.contains("stopped"));
2535 assert!(!result.contains("purged"));
2536 }
2537
2538 #[tokio::test]
2539 async fn test_execute_stop_with_purge() {
2540 let node = start_test_server().await;
2541 let cli = Cli {
2542 node,
2543 token: None,
2544 command: Some(Commands::Stop {
2545 names: vec!["test-session".into()],
2546 purge: true,
2547 }),
2548 path: None,
2549 };
2550 let result = execute(&cli).await.unwrap();
2551 assert!(result.contains("stopped and purged"));
2552 }
2553
2554 #[tokio::test]
2555 async fn test_execute_logs_success() {
2556 let node = start_test_server().await;
2557 let cli = Cli {
2558 node,
2559 token: None,
2560 command: Some(Commands::Logs {
2561 name: "test-session".into(),
2562 lines: 50,
2563 follow: false,
2564 }),
2565 path: None,
2566 };
2567 let result = execute(&cli).await.unwrap();
2568 assert!(result.contains("test output"));
2569 }
2570
2571 #[tokio::test]
2572 async fn test_execute_list_connection_refused() {
2573 let cli = Cli {
2574 node: "localhost:1".into(),
2575 token: None,
2576 command: Some(Commands::List { all: false }),
2577 path: None,
2578 };
2579 let result = execute(&cli).await;
2580 let err = result.unwrap_err().to_string();
2581 assert!(
2582 err.contains("Could not connect to pulpod"),
2583 "Expected friendly error, got: {err}"
2584 );
2585 assert!(err.contains("localhost:1"));
2586 }
2587
2588 #[tokio::test]
2589 async fn test_execute_nodes_connection_refused() {
2590 let cli = Cli {
2591 node: "localhost:1".into(),
2592 token: None,
2593 command: Some(Commands::Nodes),
2594 path: None,
2595 };
2596 let result = execute(&cli).await;
2597 let err = result.unwrap_err().to_string();
2598 assert!(err.contains("Could not connect to pulpod"));
2599 }
2600
2601 #[tokio::test]
2602 async fn test_execute_stop_error_response() {
2603 use axum::{Router, http::StatusCode, routing::post};
2604
2605 let app = Router::new().route(
2606 "/api/v1/sessions/{id}/stop",
2607 post(|| async {
2608 (
2609 StatusCode::NOT_FOUND,
2610 "{\"error\":\"session not found: test-session\"}",
2611 )
2612 }),
2613 );
2614 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2615 let addr = listener.local_addr().unwrap();
2616 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2617 let node = format!("127.0.0.1:{}", addr.port());
2618
2619 let cli = Cli {
2620 node,
2621 token: None,
2622 command: Some(Commands::Stop {
2623 names: vec!["test-session".into()],
2624 purge: false,
2625 }),
2626 path: None,
2627 };
2628 let result = execute(&cli).await.unwrap();
2629 assert!(result.contains("Error stopping test-session"), "{result}");
2630 }
2631
2632 #[tokio::test]
2633 async fn test_execute_logs_error_response() {
2634 use axum::{Router, http::StatusCode, routing::get};
2635
2636 let app = Router::new().route(
2637 "/api/v1/sessions/{id}/output",
2638 get(|| async {
2639 (
2640 StatusCode::NOT_FOUND,
2641 "{\"error\":\"session not found: ghost\"}",
2642 )
2643 }),
2644 );
2645 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2646 let addr = listener.local_addr().unwrap();
2647 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2648 let node = format!("127.0.0.1:{}", addr.port());
2649
2650 let cli = Cli {
2651 node,
2652 token: None,
2653 command: Some(Commands::Logs {
2654 name: "ghost".into(),
2655 lines: 50,
2656 follow: false,
2657 }),
2658 path: None,
2659 };
2660 let err = execute(&cli).await.unwrap_err();
2661 assert_eq!(err.to_string(), "session not found: ghost");
2662 }
2663
2664 #[tokio::test]
2665 async fn test_execute_resume_error_response() {
2666 use axum::{Router, http::StatusCode, routing::post};
2667
2668 let app = Router::new().route(
2669 "/api/v1/sessions/{id}/resume",
2670 post(|| async {
2671 (
2672 StatusCode::BAD_REQUEST,
2673 "{\"error\":\"session is not lost (status: active)\"}",
2674 )
2675 }),
2676 );
2677 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2678 let addr = listener.local_addr().unwrap();
2679 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2680 let node = format!("127.0.0.1:{}", addr.port());
2681
2682 let cli = Cli {
2683 node,
2684 token: None,
2685 command: Some(Commands::Resume {
2686 name: "test-session".into(),
2687 }),
2688 path: None,
2689 };
2690 let err = execute(&cli).await.unwrap_err();
2691 assert_eq!(err.to_string(), "session is not lost (status: active)");
2692 }
2693
2694 #[tokio::test]
2695 async fn test_execute_spawn_error_response() {
2696 use axum::{Router, http::StatusCode, routing::post};
2697
2698 let app = Router::new().route(
2699 "/api/v1/sessions",
2700 post(|| async {
2701 (
2702 StatusCode::INTERNAL_SERVER_ERROR,
2703 "{\"error\":\"failed to spawn session\"}",
2704 )
2705 }),
2706 );
2707 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2708 let addr = listener.local_addr().unwrap();
2709 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2710 let node = format!("127.0.0.1:{}", addr.port());
2711
2712 let cli = Cli {
2713 node,
2714 token: None,
2715 command: Some(Commands::Spawn {
2716 name: Some("test".into()),
2717 workdir: Some("/tmp/repo".into()),
2718 ink: None,
2719 description: None,
2720 detach: true,
2721 idle_threshold: None,
2722 auto: false,
2723 worktree: false,
2724 worktree_base: None,
2725 runtime: None,
2726 secret: vec![],
2727 command: vec!["test".into()],
2728 }),
2729 path: None,
2730 };
2731 let err = execute(&cli).await.unwrap_err();
2732 assert_eq!(err.to_string(), "failed to spawn session");
2733 }
2734
2735 #[tokio::test]
2736 async fn test_execute_interventions_error_response() {
2737 use axum::{Router, http::StatusCode, routing::get};
2738
2739 let app = Router::new().route(
2740 "/api/v1/sessions/{id}/interventions",
2741 get(|| async {
2742 (
2743 StatusCode::NOT_FOUND,
2744 "{\"error\":\"session not found: ghost\"}",
2745 )
2746 }),
2747 );
2748 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2749 let addr = listener.local_addr().unwrap();
2750 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2751 let node = format!("127.0.0.1:{}", addr.port());
2752
2753 let cli = Cli {
2754 node,
2755 token: None,
2756 command: Some(Commands::Interventions {
2757 name: "ghost".into(),
2758 }),
2759 path: None,
2760 };
2761 let err = execute(&cli).await.unwrap_err();
2762 assert_eq!(err.to_string(), "session not found: ghost");
2763 }
2764
2765 #[tokio::test]
2766 async fn test_execute_resume_success() {
2767 let node = start_test_server().await;
2768 let cli = Cli {
2769 node,
2770 token: None,
2771 command: Some(Commands::Resume {
2772 name: "test-session".into(),
2773 }),
2774 path: None,
2775 };
2776 let result = execute(&cli).await.unwrap();
2777 assert!(result.contains("Detached from session"));
2778 }
2779
2780 #[tokio::test]
2781 async fn test_execute_input_success() {
2782 let node = start_test_server().await;
2783 let cli = Cli {
2784 node,
2785 token: None,
2786 command: Some(Commands::Input {
2787 name: "test-session".into(),
2788 text: Some("yes".into()),
2789 }),
2790 path: None,
2791 };
2792 let result = execute(&cli).await.unwrap();
2793 assert!(result.contains("Sent input to session test-session"));
2794 }
2795
2796 #[tokio::test]
2797 async fn test_execute_input_no_text() {
2798 let node = start_test_server().await;
2799 let cli = Cli {
2800 node,
2801 token: None,
2802 command: Some(Commands::Input {
2803 name: "test-session".into(),
2804 text: None,
2805 }),
2806 path: None,
2807 };
2808 let result = execute(&cli).await.unwrap();
2809 assert!(result.contains("Sent input to session test-session"));
2810 }
2811
2812 #[tokio::test]
2813 async fn test_execute_input_connection_refused() {
2814 let cli = Cli {
2815 node: "localhost:1".into(),
2816 token: None,
2817 command: Some(Commands::Input {
2818 name: "test".into(),
2819 text: Some("y".into()),
2820 }),
2821 path: None,
2822 };
2823 let result = execute(&cli).await;
2824 let err = result.unwrap_err().to_string();
2825 assert!(err.contains("Could not connect to pulpod"));
2826 }
2827
2828 #[tokio::test]
2829 async fn test_execute_input_error_response() {
2830 use axum::{Router, http::StatusCode, routing::post};
2831
2832 let app = Router::new().route(
2833 "/api/v1/sessions/{id}/input",
2834 post(|| async {
2835 (
2836 StatusCode::NOT_FOUND,
2837 "{\"error\":\"session not found: ghost\"}",
2838 )
2839 }),
2840 );
2841 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2842 let addr = listener.local_addr().unwrap();
2843 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2844 let node = format!("127.0.0.1:{}", addr.port());
2845
2846 let cli = Cli {
2847 node,
2848 token: None,
2849 command: Some(Commands::Input {
2850 name: "ghost".into(),
2851 text: Some("y".into()),
2852 }),
2853 path: None,
2854 };
2855 let err = execute(&cli).await.unwrap_err();
2856 assert_eq!(err.to_string(), "session not found: ghost");
2857 }
2858
2859 #[tokio::test]
2860 async fn test_execute_ui() {
2861 let cli = Cli {
2862 node: "localhost:7433".into(),
2863 token: None,
2864 command: Some(Commands::Ui),
2865 path: None,
2866 };
2867 let result = execute(&cli).await.unwrap();
2868 assert!(result.contains("Opening"));
2869 assert!(result.contains("http://localhost:7433"));
2870 }
2871
2872 #[tokio::test]
2873 async fn test_execute_ui_custom_node() {
2874 let cli = Cli {
2875 node: "mac-mini:7433".into(),
2876 token: None,
2877 command: Some(Commands::Ui),
2878 path: None,
2879 };
2880 let result = execute(&cli).await.unwrap();
2881 assert!(result.contains("http://mac-mini:7433"));
2882 }
2883
2884 #[test]
2885 fn test_format_sessions_empty() {
2886 assert_eq!(format_sessions(&[]), "No sessions.");
2887 }
2888
2889 #[test]
2890 fn test_format_sessions_with_data() {
2891 use chrono::Utc;
2892 use pulpo_common::session::SessionStatus;
2893 use uuid::Uuid;
2894
2895 let sessions = vec![Session {
2896 id: Uuid::nil(),
2897 name: "my-api".into(),
2898 workdir: "/tmp/repo".into(),
2899 command: "claude -p 'Fix the bug'".into(),
2900 description: Some("Fix the bug".into()),
2901 status: SessionStatus::Active,
2902 exit_code: None,
2903 backend_session_id: None,
2904 output_snapshot: None,
2905 metadata: None,
2906 ink: None,
2907 intervention_code: None,
2908 intervention_reason: None,
2909 intervention_at: None,
2910 last_output_at: None,
2911 idle_since: None,
2912 idle_threshold_secs: None,
2913 worktree_path: None,
2914 worktree_branch: None,
2915 git_branch: None,
2916 git_commit: None,
2917 git_files_changed: None,
2918 git_insertions: None,
2919 git_deletions: None,
2920 git_ahead: None,
2921 runtime: Runtime::Tmux,
2922 created_at: Utc::now(),
2923 updated_at: Utc::now(),
2924 }];
2925 let output = format_sessions(&sessions);
2926 assert!(output.contains("ID"));
2927 assert!(output.contains("NAME"));
2928 assert!(output.contains("BRANCH"));
2929 assert!(output.contains("COMMAND"));
2930 assert!(output.contains("00000000"));
2931 assert!(output.contains("my-api"));
2932 assert!(output.contains("active"));
2933 assert!(output.contains("claude -p 'Fix the bug'"));
2934 }
2935
2936 #[test]
2937 fn test_format_branch_without_branch() {
2938 let s = repo_session("/home/user/test", None);
2939 assert_eq!(format_branch(&s), "-");
2940 }
2941
2942 #[test]
2943 fn test_format_branch_with_branch() {
2944 let s = repo_session("/home/user/pulpo", Some("main"));
2945 assert_eq!(format_branch(&s), "main");
2946 }
2947
2948 #[test]
2949 fn test_format_branch_with_diff_stats() {
2950 let mut s = repo_session("/home/user/pulpo", Some("main"));
2951 s.git_insertions = Some(42);
2952 s.git_deletions = Some(7);
2953 let result = format_branch(&s);
2954 assert_eq!(result, "main +42/-7");
2955 }
2956
2957 #[test]
2958 fn test_format_branch_with_ahead() {
2959 let mut s = repo_session("/home/user/pulpo", Some("main"));
2960 s.git_ahead = Some(3);
2961 let result = format_branch(&s);
2962 assert!(result.contains("\u{2191}3"));
2963 }
2964
2965 #[test]
2966 fn test_format_branch_zero_diff_hidden() {
2967 let mut s = repo_session("/home/user/pulpo", None);
2968 s.git_insertions = Some(0);
2969 s.git_deletions = Some(0);
2970 let result = format_branch(&s);
2971 assert!(!result.contains("+0/-0"));
2972 }
2973
2974 #[test]
2975 fn test_format_branch_zero_ahead_hidden() {
2976 let mut s = repo_session("/home/user/pulpo", None);
2977 s.git_ahead = Some(0);
2978 let result = format_branch(&s);
2979 assert!(!result.contains('\u{2191}'));
2980 }
2981
2982 #[test]
2983 fn test_format_sessions_with_git_branch() {
2984 use chrono::Utc;
2985 use pulpo_common::session::SessionStatus;
2986 use uuid::Uuid;
2987
2988 let sessions = vec![Session {
2989 id: Uuid::nil(),
2990 name: "my-api".into(),
2991 workdir: "/tmp/repo".into(),
2992 command: "echo hello".into(),
2993 description: None,
2994 status: SessionStatus::Active,
2995 exit_code: None,
2996 backend_session_id: None,
2997 output_snapshot: None,
2998 metadata: None,
2999 ink: None,
3000 intervention_code: None,
3001 intervention_reason: None,
3002 intervention_at: None,
3003 last_output_at: None,
3004 idle_since: None,
3005 idle_threshold_secs: None,
3006 worktree_path: None,
3007 worktree_branch: None,
3008 git_branch: Some("main".into()),
3009 git_commit: Some("abc1234".into()),
3010 git_files_changed: None,
3011 git_insertions: None,
3012 git_deletions: None,
3013 git_ahead: None,
3014 runtime: Runtime::Tmux,
3015 created_at: Utc::now(),
3016 updated_at: Utc::now(),
3017 }];
3018 let output = format_sessions(&sessions);
3019 assert!(output.contains("main"), "should show branch: {output}");
3020 }
3021
3022 #[test]
3023 fn test_format_sessions_with_error_status() {
3024 use chrono::Utc;
3025 use pulpo_common::session::SessionStatus;
3026 use uuid::Uuid;
3027
3028 let mut meta = std::collections::HashMap::new();
3029 meta.insert("error_status".into(), "Compile error".into());
3030 let sessions = vec![Session {
3031 id: Uuid::nil(),
3032 name: "my-api".into(),
3033 workdir: "/tmp/repo".into(),
3034 command: "echo hello".into(),
3035 description: None,
3036 status: SessionStatus::Active,
3037 exit_code: None,
3038 backend_session_id: None,
3039 output_snapshot: None,
3040 metadata: Some(meta),
3041 ink: None,
3042 intervention_code: None,
3043 intervention_reason: None,
3044 intervention_at: None,
3045 last_output_at: None,
3046 idle_since: None,
3047 idle_threshold_secs: None,
3048 worktree_path: None,
3049 worktree_branch: None,
3050 git_branch: None,
3051 git_commit: None,
3052 git_files_changed: None,
3053 git_insertions: None,
3054 git_deletions: None,
3055 git_ahead: None,
3056 runtime: Runtime::Tmux,
3057 created_at: Utc::now(),
3058 updated_at: Utc::now(),
3059 }];
3060 let output = format_sessions(&sessions);
3061 assert!(output.contains("[!]"));
3062 }
3063
3064 #[test]
3065 fn test_format_sessions_docker_runtime() {
3066 use chrono::Utc;
3067 use pulpo_common::session::SessionStatus;
3068 use uuid::Uuid;
3069
3070 let sessions = vec![Session {
3071 id: Uuid::nil(),
3072 name: "sandbox-test".into(),
3073 workdir: "/tmp".into(),
3074 command: "claude".into(),
3075 description: None,
3076 status: SessionStatus::Active,
3077 exit_code: None,
3078 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
3079 output_snapshot: None,
3080 metadata: None,
3081 ink: None,
3082 intervention_code: None,
3083 intervention_reason: None,
3084 intervention_at: None,
3085 last_output_at: None,
3086 idle_since: None,
3087 idle_threshold_secs: None,
3088 worktree_path: None,
3089 worktree_branch: None,
3090 git_branch: None,
3091 git_commit: None,
3092 git_files_changed: None,
3093 git_insertions: None,
3094 git_deletions: None,
3095 git_ahead: None,
3096 runtime: Runtime::Docker,
3097 created_at: Utc::now(),
3098 updated_at: Utc::now(),
3099 }];
3100 let output = format_sessions(&sessions);
3101 assert!(
3102 output.contains("sandbox-test"),
3103 "should show name: {output}"
3104 );
3105 assert!(
3106 output.contains('-'),
3107 "branch should show dash when None: {output}"
3108 );
3109 }
3110
3111 #[test]
3112 fn test_format_sessions_long_command_truncated() {
3113 use chrono::Utc;
3114 use pulpo_common::session::SessionStatus;
3115 use uuid::Uuid;
3116
3117 let sessions = vec![Session {
3118 id: Uuid::nil(),
3119 name: "test".into(),
3120 workdir: "/tmp".into(),
3121 command:
3122 "claude -p 'A very long command that exceeds fifty characters in total length here'"
3123 .into(),
3124 description: None,
3125 status: SessionStatus::Ready,
3126 exit_code: None,
3127 backend_session_id: None,
3128 output_snapshot: None,
3129 metadata: None,
3130 ink: None,
3131 intervention_code: None,
3132 intervention_reason: None,
3133 intervention_at: None,
3134 last_output_at: None,
3135 idle_since: None,
3136 idle_threshold_secs: None,
3137 worktree_path: None,
3138 worktree_branch: None,
3139 git_branch: None,
3140 git_commit: None,
3141 git_files_changed: None,
3142 git_insertions: None,
3143 git_deletions: None,
3144 git_ahead: None,
3145 runtime: Runtime::Tmux,
3146 created_at: Utc::now(),
3147 updated_at: Utc::now(),
3148 }];
3149 let output = format_sessions(&sessions);
3150 assert!(output.contains("..."));
3151 }
3152
3153 #[test]
3154 fn test_format_sessions_worktree_indicator() {
3155 use chrono::Utc;
3156 use pulpo_common::session::SessionStatus;
3157 use uuid::Uuid;
3158
3159 let sessions = vec![Session {
3160 id: Uuid::nil(),
3161 name: "wt-task".into(),
3162 workdir: "/repo".into(),
3163 command: "claude".into(),
3164 description: None,
3165 status: SessionStatus::Active,
3166 exit_code: None,
3167 backend_session_id: None,
3168 output_snapshot: None,
3169 metadata: None,
3170 ink: None,
3171 intervention_code: None,
3172 intervention_reason: None,
3173 intervention_at: None,
3174 last_output_at: None,
3175 idle_since: None,
3176 idle_threshold_secs: None,
3177 worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
3178 worktree_branch: Some("wt-task".into()),
3179 git_branch: None,
3180 git_commit: None,
3181 git_files_changed: None,
3182 git_insertions: None,
3183 git_deletions: None,
3184 git_ahead: None,
3185 runtime: Runtime::Tmux,
3186 created_at: Utc::now(),
3187 updated_at: Utc::now(),
3188 }];
3189 let output = format_sessions(&sessions);
3190 assert!(
3191 output.contains("[wt]"),
3192 "should show worktree indicator: {output}"
3193 );
3194 assert!(output.contains("wt-task [wt]"));
3195 }
3196
3197 #[test]
3198 fn test_format_sessions_pr_indicator() {
3199 use chrono::Utc;
3200 use pulpo_common::session::SessionStatus;
3201 use std::collections::HashMap;
3202 use uuid::Uuid;
3203
3204 let mut meta = HashMap::new();
3205 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3206 let sessions = vec![Session {
3207 id: Uuid::nil(),
3208 name: "pr-task".into(),
3209 workdir: "/tmp".into(),
3210 command: "claude".into(),
3211 description: None,
3212 status: SessionStatus::Active,
3213 exit_code: None,
3214 backend_session_id: None,
3215 output_snapshot: None,
3216 metadata: Some(meta),
3217 ink: None,
3218 intervention_code: None,
3219 intervention_reason: None,
3220 intervention_at: None,
3221 last_output_at: None,
3222 idle_since: None,
3223 idle_threshold_secs: None,
3224 worktree_path: None,
3225 worktree_branch: None,
3226 git_branch: None,
3227 git_commit: None,
3228 git_files_changed: None,
3229 git_insertions: None,
3230 git_deletions: None,
3231 git_ahead: None,
3232 runtime: Runtime::Tmux,
3233 created_at: Utc::now(),
3234 updated_at: Utc::now(),
3235 }];
3236 let output = format_sessions(&sessions);
3237 assert!(
3238 output.contains("[PR]"),
3239 "should show PR indicator: {output}"
3240 );
3241 assert!(output.contains("pr-task [PR]"));
3242 }
3243
3244 #[test]
3245 fn test_format_sessions_worktree_and_pr_indicator() {
3246 use chrono::Utc;
3247 use pulpo_common::session::SessionStatus;
3248 use std::collections::HashMap;
3249 use uuid::Uuid;
3250
3251 let mut meta = HashMap::new();
3252 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3253 let sessions = vec![Session {
3254 id: Uuid::nil(),
3255 name: "both-task".into(),
3256 workdir: "/tmp".into(),
3257 command: "claude".into(),
3258 description: None,
3259 status: SessionStatus::Active,
3260 exit_code: None,
3261 backend_session_id: None,
3262 output_snapshot: None,
3263 metadata: Some(meta),
3264 ink: None,
3265 intervention_code: None,
3266 intervention_reason: None,
3267 intervention_at: None,
3268 last_output_at: None,
3269 idle_since: None,
3270 idle_threshold_secs: None,
3271 worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
3272 worktree_branch: Some("both-task".into()),
3273 git_branch: None,
3274 git_commit: None,
3275 git_files_changed: None,
3276 git_insertions: None,
3277 git_deletions: None,
3278 git_ahead: None,
3279 runtime: Runtime::Tmux,
3280 created_at: Utc::now(),
3281 updated_at: Utc::now(),
3282 }];
3283 let output = format_sessions(&sessions);
3284 assert!(
3285 output.contains("[wt] [PR]"),
3286 "should show both indicators: {output}"
3287 );
3288 }
3289
3290 #[test]
3291 fn test_format_sessions_no_pr_without_metadata() {
3292 use chrono::Utc;
3293 use pulpo_common::session::SessionStatus;
3294 use uuid::Uuid;
3295
3296 let sessions = vec![Session {
3297 id: Uuid::nil(),
3298 name: "no-pr".into(),
3299 workdir: "/tmp".into(),
3300 command: "claude".into(),
3301 description: None,
3302 status: SessionStatus::Active,
3303 exit_code: None,
3304 backend_session_id: None,
3305 output_snapshot: None,
3306 metadata: None,
3307 ink: None,
3308 intervention_code: None,
3309 intervention_reason: None,
3310 intervention_at: None,
3311 last_output_at: None,
3312 idle_since: None,
3313 idle_threshold_secs: None,
3314 worktree_path: None,
3315 worktree_branch: None,
3316 git_branch: None,
3317 git_commit: None,
3318 git_files_changed: None,
3319 git_insertions: None,
3320 git_deletions: None,
3321 git_ahead: None,
3322 runtime: Runtime::Tmux,
3323 created_at: Utc::now(),
3324 updated_at: Utc::now(),
3325 }];
3326 let output = format_sessions(&sessions);
3327 assert!(
3328 !output.contains("[PR]"),
3329 "should not show PR indicator: {output}"
3330 );
3331 }
3332
3333 #[test]
3334 fn test_format_nodes() {
3335 use pulpo_common::node::NodeInfo;
3336 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3337
3338 let resp = PeersResponse {
3339 local: NodeInfo {
3340 name: "mac-mini".into(),
3341 hostname: "h".into(),
3342 os: "macos".into(),
3343 arch: "arm64".into(),
3344 cpus: 8,
3345 memory_mb: 16384,
3346 gpu: None,
3347 },
3348 peers: vec![PeerInfo {
3349 name: "win-pc".into(),
3350 address: "win-pc:7433".into(),
3351 status: PeerStatus::Online,
3352 node_info: None,
3353 session_count: Some(3),
3354 source: PeerSource::Configured,
3355 }],
3356 };
3357 let output = format_nodes(&resp);
3358 assert!(output.contains("mac-mini"));
3359 assert!(output.contains("(local)"));
3360 assert!(output.contains("win-pc"));
3361 assert!(output.contains('3'));
3362 }
3363
3364 #[test]
3365 fn test_format_nodes_no_session_count() {
3366 use pulpo_common::node::NodeInfo;
3367 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3368
3369 let resp = PeersResponse {
3370 local: NodeInfo {
3371 name: "local".into(),
3372 hostname: "h".into(),
3373 os: "linux".into(),
3374 arch: "x86_64".into(),
3375 cpus: 4,
3376 memory_mb: 8192,
3377 gpu: None,
3378 },
3379 peers: vec![PeerInfo {
3380 name: "peer".into(),
3381 address: "peer:7433".into(),
3382 status: PeerStatus::Offline,
3383 node_info: None,
3384 session_count: None,
3385 source: PeerSource::Configured,
3386 }],
3387 };
3388 let output = format_nodes(&resp);
3389 assert!(output.contains("offline"));
3390 let lines: Vec<&str> = output.lines().collect();
3392 assert!(lines[2].contains('-'));
3393 }
3394
3395 #[tokio::test]
3396 async fn test_execute_resume_connection_refused() {
3397 let cli = Cli {
3398 node: "localhost:1".into(),
3399 token: None,
3400 command: Some(Commands::Resume {
3401 name: "test".into(),
3402 }),
3403 path: None,
3404 };
3405 let result = execute(&cli).await;
3406 let err = result.unwrap_err().to_string();
3407 assert!(err.contains("Could not connect to pulpod"));
3408 }
3409
3410 #[tokio::test]
3411 async fn test_execute_spawn_connection_refused() {
3412 let cli = Cli {
3413 node: "localhost:1".into(),
3414 token: None,
3415 command: Some(Commands::Spawn {
3416 name: Some("test".into()),
3417 workdir: Some("/tmp".into()),
3418 ink: None,
3419 description: None,
3420 detach: true,
3421 idle_threshold: None,
3422 auto: false,
3423 worktree: false,
3424 worktree_base: None,
3425 runtime: None,
3426 secret: vec![],
3427 command: vec!["test".into()],
3428 }),
3429 path: None,
3430 };
3431 let result = execute(&cli).await;
3432 let err = result.unwrap_err().to_string();
3433 assert!(err.contains("Could not connect to pulpod"));
3434 }
3435
3436 #[tokio::test]
3437 async fn test_execute_stop_connection_refused() {
3438 let cli = Cli {
3439 node: "localhost:1".into(),
3440 token: None,
3441 command: Some(Commands::Stop {
3442 names: vec!["test".into()],
3443 purge: false,
3444 }),
3445 path: None,
3446 };
3447 let result = execute(&cli).await;
3448 let err = result.unwrap_err().to_string();
3449 assert!(err.contains("Could not connect to pulpod"));
3450 }
3451
3452 #[tokio::test]
3453 async fn test_execute_logs_connection_refused() {
3454 let cli = Cli {
3455 node: "localhost:1".into(),
3456 token: None,
3457 command: Some(Commands::Logs {
3458 name: "test".into(),
3459 lines: 50,
3460 follow: false,
3461 }),
3462 path: None,
3463 };
3464 let result = execute(&cli).await;
3465 let err = result.unwrap_err().to_string();
3466 assert!(err.contains("Could not connect to pulpod"));
3467 }
3468
3469 #[tokio::test]
3470 async fn test_friendly_error_connect() {
3471 let err = reqwest::Client::new()
3473 .get("http://127.0.0.1:1")
3474 .send()
3475 .await
3476 .unwrap_err();
3477 let friendly = friendly_error(&err, "test-node:1");
3478 let msg = friendly.to_string();
3479 assert!(
3480 msg.contains("Could not connect"),
3481 "Expected connect message, got: {msg}"
3482 );
3483 }
3484
3485 #[tokio::test]
3486 async fn test_friendly_error_other() {
3487 let err = reqwest::Client::new()
3489 .get("http://[::invalid::url")
3490 .send()
3491 .await
3492 .unwrap_err();
3493 let friendly = friendly_error(&err, "bad-host");
3494 let msg = friendly.to_string();
3495 assert!(
3496 msg.contains("Network error"),
3497 "Expected network error message, got: {msg}"
3498 );
3499 assert!(msg.contains("bad-host"));
3500 }
3501
3502 #[test]
3505 fn test_is_localhost_variants() {
3506 assert!(is_localhost("localhost:7433"));
3507 assert!(is_localhost("127.0.0.1:7433"));
3508 assert!(is_localhost("[::1]:7433"));
3509 assert!(is_localhost("::1"));
3510 assert!(is_localhost("localhost"));
3511 assert!(!is_localhost("mac-mini:7433"));
3512 assert!(!is_localhost("192.168.1.100:7433"));
3513 }
3514
3515 #[test]
3516 fn test_authed_get_with_token() {
3517 let client = reqwest::Client::new();
3518 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3519 .build()
3520 .unwrap();
3521 let auth = req
3522 .headers()
3523 .get("authorization")
3524 .unwrap()
3525 .to_str()
3526 .unwrap();
3527 assert_eq!(auth, "Bearer tok");
3528 }
3529
3530 #[test]
3531 fn test_authed_get_without_token() {
3532 let client = reqwest::Client::new();
3533 let req = authed_get(&client, "http://h:1/api".into(), None)
3534 .build()
3535 .unwrap();
3536 assert!(req.headers().get("authorization").is_none());
3537 }
3538
3539 #[test]
3540 fn test_authed_post_with_token() {
3541 let client = reqwest::Client::new();
3542 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3543 .build()
3544 .unwrap();
3545 let auth = req
3546 .headers()
3547 .get("authorization")
3548 .unwrap()
3549 .to_str()
3550 .unwrap();
3551 assert_eq!(auth, "Bearer secret");
3552 }
3553
3554 #[test]
3555 fn test_authed_post_without_token() {
3556 let client = reqwest::Client::new();
3557 let req = authed_post(&client, "http://h:1/api".into(), None)
3558 .build()
3559 .unwrap();
3560 assert!(req.headers().get("authorization").is_none());
3561 }
3562
3563 #[test]
3564 fn test_authed_delete_with_token() {
3565 let client = reqwest::Client::new();
3566 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3567 .build()
3568 .unwrap();
3569 let auth = req
3570 .headers()
3571 .get("authorization")
3572 .unwrap()
3573 .to_str()
3574 .unwrap();
3575 assert_eq!(auth, "Bearer del-tok");
3576 }
3577
3578 #[test]
3579 fn test_authed_delete_without_token() {
3580 let client = reqwest::Client::new();
3581 let req = authed_delete(&client, "http://h:1/api".into(), None)
3582 .build()
3583 .unwrap();
3584 assert!(req.headers().get("authorization").is_none());
3585 }
3586
3587 #[tokio::test]
3588 async fn test_resolve_token_explicit() {
3589 let client = reqwest::Client::new();
3590 let token =
3591 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3592 assert_eq!(token, Some("my-tok".into()));
3593 }
3594
3595 #[tokio::test]
3596 async fn test_resolve_token_remote_no_explicit() {
3597 let client = reqwest::Client::new();
3598 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3599 assert_eq!(token, None);
3600 }
3601
3602 #[tokio::test]
3603 async fn test_resolve_token_localhost_auto_discover() {
3604 use axum::{Json, Router, routing::get};
3605
3606 let app = Router::new().route(
3607 "/api/v1/auth/token",
3608 get(|| async {
3609 Json(AuthTokenResponse {
3610 token: "discovered".into(),
3611 })
3612 }),
3613 );
3614 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3615 let addr = listener.local_addr().unwrap();
3616 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3617
3618 let node = format!("localhost:{}", addr.port());
3619 let base = base_url(&node);
3620 let client = reqwest::Client::new();
3621 let token = resolve_token(&client, &base, &node, None).await;
3622 assert_eq!(token, Some("discovered".into()));
3623 }
3624
3625 #[tokio::test]
3626 async fn test_discover_token_empty_returns_none() {
3627 use axum::{Json, Router, routing::get};
3628
3629 let app = Router::new().route(
3630 "/api/v1/auth/token",
3631 get(|| async {
3632 Json(AuthTokenResponse {
3633 token: String::new(),
3634 })
3635 }),
3636 );
3637 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3638 let addr = listener.local_addr().unwrap();
3639 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3640
3641 let base = format!("http://127.0.0.1:{}", addr.port());
3642 let client = reqwest::Client::new();
3643 assert_eq!(discover_token(&client, &base).await, None);
3644 }
3645
3646 #[tokio::test]
3647 async fn test_discover_token_unreachable_returns_none() {
3648 let client = reqwest::Client::new();
3649 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3650 }
3651
3652 #[test]
3653 fn test_cli_parse_with_token() {
3654 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3655 assert_eq!(cli.token, Some("my-secret".into()));
3656 }
3657
3658 #[test]
3659 fn test_cli_parse_without_token() {
3660 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3661 assert_eq!(cli.token, None);
3662 }
3663
3664 #[tokio::test]
3665 async fn test_execute_with_explicit_token_sends_header() {
3666 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3667
3668 let app = Router::new().route(
3669 "/api/v1/sessions",
3670 get(|req: Request| async move {
3671 let auth = req
3672 .headers()
3673 .get("authorization")
3674 .and_then(|v| v.to_str().ok())
3675 .unwrap_or("");
3676 assert_eq!(auth, "Bearer test-token");
3677 (StatusCode::OK, "[]".to_owned())
3678 }),
3679 );
3680 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681 let addr = listener.local_addr().unwrap();
3682 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3683 let node = format!("127.0.0.1:{}", addr.port());
3684
3685 let cli = Cli {
3686 node,
3687 token: Some("test-token".into()),
3688 command: Some(Commands::List { all: false }),
3689 path: None,
3690 };
3691 let result = execute(&cli).await.unwrap();
3692 assert_eq!(result, "No sessions.");
3693 }
3694
3695 #[test]
3698 fn test_cli_parse_interventions() {
3699 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3700 assert!(matches!(
3701 &cli.command,
3702 Some(Commands::Interventions { name }) if name == "my-session"
3703 ));
3704 }
3705
3706 #[test]
3707 fn test_format_interventions_empty() {
3708 assert_eq!(format_interventions(&[]), "No intervention events.");
3709 }
3710
3711 #[test]
3712 fn test_format_interventions_with_data() {
3713 let events = vec![
3714 InterventionEventResponse {
3715 id: 1,
3716 session_id: "sess-1".into(),
3717 code: None,
3718 reason: "Memory exceeded threshold".into(),
3719 created_at: "2026-01-01T00:00:00Z".into(),
3720 },
3721 InterventionEventResponse {
3722 id: 2,
3723 session_id: "sess-1".into(),
3724 code: None,
3725 reason: "Idle for 10 minutes".into(),
3726 created_at: "2026-01-02T00:00:00Z".into(),
3727 },
3728 ];
3729 let output = format_interventions(&events);
3730 assert!(output.contains("ID"));
3731 assert!(output.contains("TIMESTAMP"));
3732 assert!(output.contains("REASON"));
3733 assert!(output.contains("Memory exceeded threshold"));
3734 assert!(output.contains("Idle for 10 minutes"));
3735 assert!(output.contains("2026-01-01T00:00:00Z"));
3736 }
3737
3738 #[tokio::test]
3739 async fn test_execute_interventions_empty() {
3740 let node = start_test_server().await;
3741 let cli = Cli {
3742 node,
3743 token: None,
3744 command: Some(Commands::Interventions {
3745 name: "my-session".into(),
3746 }),
3747 path: None,
3748 };
3749 let result = execute(&cli).await.unwrap();
3750 assert_eq!(result, "No intervention events.");
3751 }
3752
3753 #[tokio::test]
3754 async fn test_execute_interventions_with_data() {
3755 use axum::{Router, routing::get};
3756
3757 let app = Router::new().route(
3758 "/api/v1/sessions/{id}/interventions",
3759 get(|| async {
3760 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3761 .to_owned()
3762 }),
3763 );
3764 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3765 let addr = listener.local_addr().unwrap();
3766 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3767 let node = format!("127.0.0.1:{}", addr.port());
3768
3769 let cli = Cli {
3770 node,
3771 token: None,
3772 command: Some(Commands::Interventions {
3773 name: "test".into(),
3774 }),
3775 path: None,
3776 };
3777 let result = execute(&cli).await.unwrap();
3778 assert!(result.contains("OOM"));
3779 assert!(result.contains("2026-01-01T00:00:00Z"));
3780 }
3781
3782 #[tokio::test]
3783 async fn test_execute_interventions_connection_refused() {
3784 let cli = Cli {
3785 node: "localhost:1".into(),
3786 token: None,
3787 command: Some(Commands::Interventions {
3788 name: "test".into(),
3789 }),
3790 path: None,
3791 };
3792 let result = execute(&cli).await;
3793 let err = result.unwrap_err().to_string();
3794 assert!(err.contains("Could not connect to pulpod"));
3795 }
3796
3797 #[test]
3800 fn test_build_attach_command_tmux() {
3801 let cmd = build_attach_command("my-session");
3802 assert_eq!(cmd.get_program(), "tmux");
3803 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3804 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3805 }
3806
3807 #[test]
3808 fn test_build_attach_command_docker() {
3809 let cmd = build_attach_command("docker:pulpo-my-task");
3810 assert_eq!(cmd.get_program(), "docker");
3811 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3812 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3813 }
3814
3815 #[test]
3816 fn test_cli_parse_attach() {
3817 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3818 assert!(matches!(
3819 &cli.command,
3820 Some(Commands::Attach { name }) if name == "my-session"
3821 ));
3822 }
3823
3824 #[test]
3825 fn test_cli_parse_attach_alias() {
3826 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3827 assert!(matches!(
3828 &cli.command,
3829 Some(Commands::Attach { name }) if name == "my-session"
3830 ));
3831 }
3832
3833 #[tokio::test]
3834 async fn test_execute_attach_success() {
3835 let node = start_test_server().await;
3836 let cli = Cli {
3837 node,
3838 token: None,
3839 command: Some(Commands::Attach {
3840 name: "test-session".into(),
3841 }),
3842 path: None,
3843 };
3844 let result = execute(&cli).await.unwrap();
3845 assert!(result.contains("Detached from session test-session"));
3846 }
3847
3848 #[tokio::test]
3849 async fn test_execute_attach_with_backend_session_id() {
3850 use axum::{Router, routing::get};
3851 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3852 let app = Router::new().route(
3853 "/api/v1/sessions/{id}",
3854 get(move || async move { session_json.to_owned() }),
3855 );
3856 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3857 let addr = listener.local_addr().unwrap();
3858 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3859
3860 let cli = Cli {
3861 node: format!("127.0.0.1:{}", addr.port()),
3862 token: None,
3863 command: Some(Commands::Attach {
3864 name: "my-session".into(),
3865 }),
3866 path: None,
3867 };
3868 let result = execute(&cli).await.unwrap();
3869 assert!(result.contains("Detached from session my-session"));
3870 }
3871
3872 #[tokio::test]
3873 async fn test_execute_attach_connection_refused() {
3874 let cli = Cli {
3875 node: "localhost:1".into(),
3876 token: None,
3877 command: Some(Commands::Attach {
3878 name: "test-session".into(),
3879 }),
3880 path: None,
3881 };
3882 let result = execute(&cli).await;
3883 let err = result.unwrap_err().to_string();
3884 assert!(err.contains("Could not connect to pulpod"));
3885 }
3886
3887 #[tokio::test]
3888 async fn test_execute_attach_error_response() {
3889 use axum::{Router, http::StatusCode, routing::get};
3890 let app = Router::new().route(
3891 "/api/v1/sessions/{id}",
3892 get(|| async {
3893 (
3894 StatusCode::NOT_FOUND,
3895 r#"{"error":"session not found"}"#.to_owned(),
3896 )
3897 }),
3898 );
3899 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3900 let addr = listener.local_addr().unwrap();
3901 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3902
3903 let cli = Cli {
3904 node: format!("127.0.0.1:{}", addr.port()),
3905 token: None,
3906 command: Some(Commands::Attach {
3907 name: "nonexistent".into(),
3908 }),
3909 path: None,
3910 };
3911 let result = execute(&cli).await;
3912 let err = result.unwrap_err().to_string();
3913 assert!(err.contains("session not found"));
3914 }
3915
3916 #[tokio::test]
3917 async fn test_execute_attach_stale_session() {
3918 use axum::{Router, routing::get};
3919 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3920 let app = Router::new().route(
3921 "/api/v1/sessions/{id}",
3922 get(move || async move { session_json.to_owned() }),
3923 );
3924 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3925 let addr = listener.local_addr().unwrap();
3926 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3927
3928 let cli = Cli {
3929 node: format!("127.0.0.1:{}", addr.port()),
3930 token: None,
3931 command: Some(Commands::Attach {
3932 name: "stale-sess".into(),
3933 }),
3934 path: None,
3935 };
3936 let result = execute(&cli).await;
3937 let err = result.unwrap_err().to_string();
3938 assert!(err.contains("lost"));
3939 assert!(err.contains("pulpo resume"));
3940 }
3941
3942 #[tokio::test]
3943 async fn test_execute_attach_dead_session() {
3944 use axum::{Router, routing::get};
3945 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3946 let app = Router::new().route(
3947 "/api/v1/sessions/{id}",
3948 get(move || async move { session_json.to_owned() }),
3949 );
3950 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3951 let addr = listener.local_addr().unwrap();
3952 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3953
3954 let cli = Cli {
3955 node: format!("127.0.0.1:{}", addr.port()),
3956 token: None,
3957 command: Some(Commands::Attach {
3958 name: "dead-sess".into(),
3959 }),
3960 path: None,
3961 };
3962 let result = execute(&cli).await;
3963 let err = result.unwrap_err().to_string();
3964 assert!(err.contains("stopped"));
3965 assert!(err.contains("cannot attach"));
3966 }
3967
3968 #[test]
3971 fn test_cli_parse_alias_spawn() {
3972 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3973 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3974 }
3975
3976 #[test]
3977 fn test_cli_parse_alias_list() {
3978 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3979 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3980 }
3981
3982 #[test]
3983 fn test_cli_parse_list_all() {
3984 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3985 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3986
3987 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3988 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3989 }
3990
3991 #[test]
3992 fn test_cli_parse_alias_logs() {
3993 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3994 assert!(matches!(
3995 &cli.command,
3996 Some(Commands::Logs { name, .. }) if name == "my-session"
3997 ));
3998 }
3999
4000 #[test]
4001 fn test_cli_parse_alias_stop() {
4002 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
4003 assert!(matches!(
4004 &cli.command,
4005 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
4006 ));
4007 }
4008
4009 #[test]
4010 fn test_cli_parse_alias_resume() {
4011 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
4012 assert!(matches!(
4013 &cli.command,
4014 Some(Commands::Resume { name }) if name == "my-session"
4015 ));
4016 }
4017
4018 #[test]
4019 fn test_cli_parse_alias_nodes() {
4020 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
4021 assert!(matches!(&cli.command, Some(Commands::Nodes)));
4022 }
4023
4024 #[test]
4025 fn test_cli_parse_alias_interventions() {
4026 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
4027 assert!(matches!(
4028 &cli.command,
4029 Some(Commands::Interventions { name }) if name == "my-session"
4030 ));
4031 }
4032
4033 #[test]
4034 fn test_api_error_json() {
4035 let err = api_error("{\"error\":\"session not found: foo\"}");
4036 assert_eq!(err.to_string(), "session not found: foo");
4037 }
4038
4039 #[test]
4040 fn test_api_error_plain_text() {
4041 let err = api_error("plain text error");
4042 assert_eq!(err.to_string(), "plain text error");
4043 }
4044
4045 #[test]
4048 fn test_diff_output_empty_prev() {
4049 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
4050 }
4051
4052 #[test]
4053 fn test_diff_output_identical() {
4054 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
4055 }
4056
4057 #[test]
4058 fn test_diff_output_new_lines_appended() {
4059 let prev = "line1\nline2";
4060 let new = "line1\nline2\nline3\nline4";
4061 assert_eq!(diff_output(prev, new), "line3\nline4");
4062 }
4063
4064 #[test]
4065 fn test_diff_output_scrolled_window() {
4066 let prev = "line1\nline2\nline3";
4068 let new = "line2\nline3\nline4";
4069 assert_eq!(diff_output(prev, new), "line4");
4070 }
4071
4072 #[test]
4073 fn test_diff_output_completely_different() {
4074 let prev = "aaa\nbbb";
4075 let new = "xxx\nyyy";
4076 assert_eq!(diff_output(prev, new), "xxx\nyyy");
4077 }
4078
4079 #[test]
4080 fn test_diff_output_last_line_matches_but_overlap_fails() {
4081 let prev = "aaa\ncommon";
4083 let new = "zzz\ncommon\nnew_line";
4084 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
4088 }
4089
4090 #[test]
4091 fn test_diff_output_new_empty() {
4092 assert_eq!(diff_output("line1", ""), "");
4093 }
4094
4095 async fn start_follow_test_server() -> String {
4100 use axum::{Router, extract::Path, extract::Query, routing::get};
4101 use std::sync::Arc;
4102 use std::sync::atomic::{AtomicUsize, Ordering};
4103
4104 let call_count = Arc::new(AtomicUsize::new(0));
4105 let output_count = call_count.clone();
4106
4107 let app = Router::new()
4108 .route(
4109 "/api/v1/sessions/{id}/output",
4110 get(
4111 move |_path: Path<String>,
4112 _query: Query<std::collections::HashMap<String, String>>| {
4113 let count = output_count.clone();
4114 async move {
4115 let n = count.fetch_add(1, Ordering::SeqCst);
4116 let output = match n {
4117 0 => "line1\nline2".to_owned(),
4118 1 => "line1\nline2\nline3".to_owned(),
4119 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
4120 };
4121 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
4122 }
4123 },
4124 ),
4125 )
4126 .route(
4127 "/api/v1/sessions/{id}",
4128 get(|_path: Path<String>| async {
4129 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4130 }),
4131 );
4132
4133 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4134 let addr = listener.local_addr().unwrap();
4135 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4136 format!("http://127.0.0.1:{}", addr.port())
4137 }
4138
4139 #[tokio::test]
4140 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
4141 let base = start_follow_test_server().await;
4142 let client = reqwest::Client::new();
4143 let mut buf = Vec::new();
4144
4145 follow_logs(&client, &base, "test", 100, None, &mut buf)
4146 .await
4147 .unwrap();
4148
4149 let output = String::from_utf8(buf).unwrap();
4150 assert!(output.contains("line1"));
4152 assert!(output.contains("line2"));
4153 assert!(output.contains("line3"));
4154 assert!(output.contains("line4"));
4155 assert!(output.contains("[pulpo] Agent exited"));
4156 }
4157
4158 #[tokio::test]
4159 async fn test_execute_logs_follow_success() {
4160 let base = start_follow_test_server().await;
4161 let node = base.strip_prefix("http://").unwrap().to_owned();
4163
4164 let cli = Cli {
4165 node,
4166 token: None,
4167 command: Some(Commands::Logs {
4168 name: "test".into(),
4169 lines: 100,
4170 follow: true,
4171 }),
4172 path: None,
4173 };
4174 let result = execute(&cli).await.unwrap();
4176 assert_eq!(result, "");
4177 }
4178
4179 #[tokio::test]
4180 async fn test_execute_logs_follow_connection_refused() {
4181 let cli = Cli {
4182 node: "localhost:1".into(),
4183 token: None,
4184 command: Some(Commands::Logs {
4185 name: "test".into(),
4186 lines: 50,
4187 follow: true,
4188 }),
4189 path: None,
4190 };
4191 let result = execute(&cli).await;
4192 let err = result.unwrap_err().to_string();
4193 assert!(
4194 err.contains("Could not connect to pulpod"),
4195 "Expected friendly error, got: {err}"
4196 );
4197 }
4198
4199 #[tokio::test]
4200 async fn test_follow_logs_exits_on_dead() {
4201 use axum::{Router, extract::Path, extract::Query, routing::get};
4202
4203 let app = Router::new()
4204 .route(
4205 "/api/v1/sessions/{id}/output",
4206 get(
4207 |_path: Path<String>,
4208 _query: Query<std::collections::HashMap<String, String>>| async {
4209 r#"{"output":"some output"}"#.to_owned()
4210 },
4211 ),
4212 )
4213 .route(
4214 "/api/v1/sessions/{id}",
4215 get(|_path: Path<String>| async {
4216 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4217 }),
4218 );
4219
4220 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4221 let addr = listener.local_addr().unwrap();
4222 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4223 let base = format!("http://127.0.0.1:{}", addr.port());
4224
4225 let client = reqwest::Client::new();
4226 let mut buf = Vec::new();
4227 follow_logs(&client, &base, "test", 100, None, &mut buf)
4228 .await
4229 .unwrap();
4230
4231 let output = String::from_utf8(buf).unwrap();
4232 assert!(output.contains("some output"));
4233 }
4234
4235 #[tokio::test]
4236 async fn test_follow_logs_exits_on_stale() {
4237 use axum::{Router, extract::Path, extract::Query, routing::get};
4238
4239 let app = Router::new()
4240 .route(
4241 "/api/v1/sessions/{id}/output",
4242 get(
4243 |_path: Path<String>,
4244 _query: Query<std::collections::HashMap<String, String>>| async {
4245 r#"{"output":"stale output"}"#.to_owned()
4246 },
4247 ),
4248 )
4249 .route(
4250 "/api/v1/sessions/{id}",
4251 get(|_path: Path<String>| async {
4252 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4253 }),
4254 );
4255
4256 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4257 let addr = listener.local_addr().unwrap();
4258 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4259 let base = format!("http://127.0.0.1:{}", addr.port());
4260
4261 let client = reqwest::Client::new();
4262 let mut buf = Vec::new();
4263 follow_logs(&client, &base, "test", 100, None, &mut buf)
4264 .await
4265 .unwrap();
4266
4267 let output = String::from_utf8(buf).unwrap();
4268 assert!(output.contains("stale output"));
4269 }
4270
4271 #[tokio::test]
4272 async fn test_execute_logs_follow_non_reqwest_error() {
4273 use axum::{Router, extract::Path, extract::Query, routing::get};
4274
4275 let app = Router::new()
4277 .route(
4278 "/api/v1/sessions/{id}/output",
4279 get(
4280 |_path: Path<String>,
4281 _query: Query<std::collections::HashMap<String, String>>| async {
4282 r#"{"output":"initial"}"#.to_owned()
4283 },
4284 ),
4285 )
4286 .route(
4287 "/api/v1/sessions/{id}",
4288 get(|_path: Path<String>| async { "not valid json".to_owned() }),
4289 );
4290
4291 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4292 let addr = listener.local_addr().unwrap();
4293 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4294 let node = format!("127.0.0.1:{}", addr.port());
4295
4296 let cli = Cli {
4297 node,
4298 token: None,
4299 command: Some(Commands::Logs {
4300 name: "test".into(),
4301 lines: 100,
4302 follow: true,
4303 }),
4304 path: None,
4305 };
4306 let err = execute(&cli).await.unwrap_err();
4307 let msg = err.to_string();
4309 assert!(
4310 msg.contains("expected ident"),
4311 "Expected serde parse error, got: {msg}"
4312 );
4313 }
4314
4315 #[tokio::test]
4316 async fn test_fetch_session_status_connection_error() {
4317 let client = reqwest::Client::new();
4318 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
4319 assert!(result.is_err());
4320 }
4321
4322 #[test]
4325 fn test_format_schedules_empty() {
4326 assert_eq!(format_schedules(&[]), "No schedules.");
4327 }
4328
4329 #[test]
4330 fn test_format_schedules_with_entries() {
4331 let schedules = vec![serde_json::json!({
4332 "name": "nightly",
4333 "cron": "0 3 * * *",
4334 "enabled": true,
4335 "last_run_at": null,
4336 "target_node": null
4337 })];
4338 let output = format_schedules(&schedules);
4339 assert!(output.contains("nightly"));
4340 assert!(output.contains("0 3 * * *"));
4341 assert!(output.contains("local"));
4342 assert!(output.contains("yes"));
4343 assert!(output.contains('-'));
4344 }
4345
4346 #[test]
4347 fn test_format_schedules_disabled_entry() {
4348 let schedules = vec![serde_json::json!({
4349 "name": "weekly",
4350 "cron": "0 0 * * 0",
4351 "enabled": false,
4352 "last_run_at": "2026-03-18T03:00:00Z",
4353 "target_node": "gpu-box"
4354 })];
4355 let output = format_schedules(&schedules);
4356 assert!(output.contains("weekly"));
4357 assert!(output.contains("no"));
4358 assert!(output.contains("gpu-box"));
4359 assert!(output.contains("2026-03-18T03:00"));
4360 }
4361
4362 #[test]
4363 fn test_format_schedules_header() {
4364 let schedules = vec![serde_json::json!({
4365 "name": "test",
4366 "cron": "* * * * *",
4367 "enabled": true,
4368 "last_run_at": null,
4369 "target_node": null
4370 })];
4371 let output = format_schedules(&schedules);
4372 assert!(output.contains("NAME"));
4373 assert!(output.contains("CRON"));
4374 assert!(output.contains("ENABLED"));
4375 assert!(output.contains("LAST RUN"));
4376 assert!(output.contains("NODE"));
4377 }
4378
4379 #[test]
4382 fn test_cli_parse_schedule_add() {
4383 let cli = Cli::try_parse_from([
4384 "pulpo",
4385 "schedule",
4386 "add",
4387 "nightly",
4388 "0 3 * * *",
4389 "--workdir",
4390 "/repo",
4391 "--",
4392 "claude",
4393 "-p",
4394 "review",
4395 ])
4396 .unwrap();
4397 assert!(matches!(
4398 &cli.command,
4399 Some(Commands::Schedule {
4400 action: ScheduleAction::Add { name, cron, .. }
4401 }) if name == "nightly" && cron == "0 3 * * *"
4402 ));
4403 }
4404
4405 #[test]
4406 fn test_cli_parse_schedule_add_with_node() {
4407 let cli = Cli::try_parse_from([
4408 "pulpo",
4409 "schedule",
4410 "add",
4411 "nightly",
4412 "0 3 * * *",
4413 "--workdir",
4414 "/repo",
4415 "--node",
4416 "gpu-box",
4417 "--",
4418 "claude",
4419 ])
4420 .unwrap();
4421 assert!(matches!(
4422 &cli.command,
4423 Some(Commands::Schedule {
4424 action: ScheduleAction::Add { node, .. }
4425 }) if node.as_deref() == Some("gpu-box")
4426 ));
4427 }
4428
4429 #[test]
4430 fn test_cli_parse_schedule_add_install_alias() {
4431 let cli =
4432 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4433 assert!(matches!(
4434 &cli.command,
4435 Some(Commands::Schedule {
4436 action: ScheduleAction::Add { name, .. }
4437 }) if name == "nightly"
4438 ));
4439 }
4440
4441 #[test]
4442 fn test_cli_parse_schedule_list() {
4443 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4444 assert!(matches!(
4445 &cli.command,
4446 Some(Commands::Schedule {
4447 action: ScheduleAction::List
4448 })
4449 ));
4450 }
4451
4452 #[test]
4453 fn test_cli_parse_schedule_remove() {
4454 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4455 assert!(matches!(
4456 &cli.command,
4457 Some(Commands::Schedule {
4458 action: ScheduleAction::Remove { name }
4459 }) if name == "nightly"
4460 ));
4461 }
4462
4463 #[test]
4464 fn test_cli_parse_schedule_pause() {
4465 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4466 assert!(matches!(
4467 &cli.command,
4468 Some(Commands::Schedule {
4469 action: ScheduleAction::Pause { name }
4470 }) if name == "nightly"
4471 ));
4472 }
4473
4474 #[test]
4475 fn test_cli_parse_schedule_resume() {
4476 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4477 assert!(matches!(
4478 &cli.command,
4479 Some(Commands::Schedule {
4480 action: ScheduleAction::Resume { name }
4481 }) if name == "nightly"
4482 ));
4483 }
4484
4485 #[test]
4486 fn test_cli_parse_schedule_alias() {
4487 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4488 assert!(matches!(
4489 &cli.command,
4490 Some(Commands::Schedule {
4491 action: ScheduleAction::List
4492 })
4493 ));
4494 }
4495
4496 #[test]
4497 fn test_cli_parse_schedule_list_alias() {
4498 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4499 assert!(matches!(
4500 &cli.command,
4501 Some(Commands::Schedule {
4502 action: ScheduleAction::List
4503 })
4504 ));
4505 }
4506
4507 #[test]
4508 fn test_cli_parse_schedule_remove_alias() {
4509 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4510 assert!(matches!(
4511 &cli.command,
4512 Some(Commands::Schedule {
4513 action: ScheduleAction::Remove { name }
4514 }) if name == "nightly"
4515 ));
4516 }
4517
4518 #[tokio::test]
4519 async fn test_execute_schedule_list_via_execute() {
4520 let node = start_test_server().await;
4521 let cli = Cli {
4522 node,
4523 token: None,
4524 command: Some(Commands::Schedule {
4525 action: ScheduleAction::List,
4526 }),
4527 path: None,
4528 };
4529 let result = execute(&cli).await.unwrap();
4530 #[cfg(coverage)]
4532 assert!(result.is_empty());
4533 #[cfg(not(coverage))]
4534 assert_eq!(result, "No schedules.");
4535 }
4536
4537 #[test]
4538 fn test_schedule_action_debug() {
4539 let action = ScheduleAction::List;
4540 assert_eq!(format!("{action:?}"), "List");
4541 }
4542
4543 #[test]
4544 fn test_cli_parse_send_alias() {
4545 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4546 assert!(matches!(
4547 &cli.command,
4548 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4549 ));
4550 }
4551
4552 #[test]
4553 fn test_cli_parse_spawn_no_name() {
4554 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4555 assert!(matches!(
4556 &cli.command,
4557 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4558 ));
4559 }
4560
4561 #[test]
4562 fn test_cli_parse_spawn_optional_name_with_command() {
4563 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4564 assert!(matches!(
4565 &cli.command,
4566 Some(Commands::Spawn { name, command, .. })
4567 if name.is_none() && command == &["echo", "hello"]
4568 ));
4569 }
4570
4571 #[test]
4572 fn test_cli_parse_path_shortcut() {
4573 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4574 assert!(cli.command.is_none());
4575 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4576 }
4577
4578 #[test]
4579 fn test_cli_parse_no_args() {
4580 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4581 assert!(cli.command.is_none());
4582 assert!(cli.path.is_none());
4583 }
4584
4585 #[test]
4586 fn test_derive_session_name_simple() {
4587 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4588 }
4589
4590 #[test]
4591 fn test_derive_session_name_with_special_chars() {
4592 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4593 }
4594
4595 #[test]
4596 fn test_derive_session_name_root() {
4597 assert_eq!(derive_session_name("/"), "session");
4598 }
4599
4600 #[test]
4601 fn test_derive_session_name_dots() {
4602 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4603 }
4604
4605 #[test]
4606 fn test_resolve_path_absolute() {
4607 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4608 }
4609
4610 #[test]
4611 fn test_resolve_path_relative() {
4612 let resolved = resolve_path("my-repo");
4613 assert!(resolved.ends_with("my-repo"));
4614 assert!(resolved.starts_with('/'));
4615 }
4616
4617 #[tokio::test]
4618 async fn test_execute_no_args_shows_help() {
4619 let node = start_test_server().await;
4620 let cli = Cli {
4621 node,
4622 token: None,
4623 path: None,
4624 command: None,
4625 };
4626 let result = execute(&cli).await.unwrap();
4627 assert!(
4628 result.is_empty(),
4629 "no-args should return empty string after printing help"
4630 );
4631 }
4632
4633 #[tokio::test]
4634 async fn test_execute_path_shortcut() {
4635 let node = start_test_server().await;
4636 let cli = Cli {
4637 node,
4638 token: None,
4639 path: Some("/tmp".into()),
4640 command: None,
4641 };
4642 let result = execute(&cli).await.unwrap();
4643 assert!(result.contains("Detached from session"));
4644 }
4645
4646 #[tokio::test]
4647 async fn test_deduplicate_session_name_no_conflict() {
4648 let base = "http://127.0.0.1:1";
4650 let client = reqwest::Client::new();
4651 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4652 assert_eq!(name, "fresh");
4653 }
4654
4655 #[tokio::test]
4656 async fn test_deduplicate_session_name_with_conflict() {
4657 use axum::{Router, routing::get};
4658 use std::sync::atomic::{AtomicU32, Ordering};
4659
4660 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4661 let counter = call_count.clone();
4662 let app = Router::new()
4663 .route(
4664 "/api/v1/sessions/{id}",
4665 get(move || {
4666 let c = counter.clone();
4667 async move {
4668 let n = c.fetch_add(1, Ordering::SeqCst);
4669 if n == 0 {
4670 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4672 } else {
4673 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4675 }
4676 }
4677 }),
4678 )
4679 .route(
4680 "/api/v1/peers",
4681 get(|| async {
4682 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4683 }),
4684 );
4685 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4686 let addr = listener.local_addr().unwrap();
4687 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4688 let base = format!("http://127.0.0.1:{}", addr.port());
4689 let client = reqwest::Client::new();
4690 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4691 assert_eq!(name, "repo-2");
4692 }
4693
4694 #[test]
4697 fn test_node_needs_resolution() {
4698 assert!(!node_needs_resolution("localhost:7433"));
4699 assert!(!node_needs_resolution("mac-mini:7433"));
4700 assert!(!node_needs_resolution("10.0.0.1:7433"));
4701 assert!(!node_needs_resolution("[::1]:7433"));
4702 assert!(node_needs_resolution("mac-mini"));
4703 assert!(node_needs_resolution("linux-server"));
4704 assert!(node_needs_resolution("localhost"));
4705 }
4706
4707 #[tokio::test]
4708 async fn test_resolve_node_with_port() {
4709 let client = reqwest::Client::new();
4710 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4711 assert_eq!(addr, "mac-mini:7433");
4712 assert!(token.is_none());
4713 }
4714
4715 #[tokio::test]
4716 async fn test_resolve_node_fallback_appends_port() {
4717 let client = reqwest::Client::new();
4720 let (addr, token) = resolve_node(&client, "unknown-host").await;
4721 assert_eq!(addr, "unknown-host:7433");
4722 assert!(token.is_none());
4723 }
4724
4725 #[cfg(not(coverage))]
4726 #[tokio::test]
4727 async fn test_resolve_node_finds_peer() {
4728 use axum::{Router, routing::get};
4729
4730 let app = Router::new()
4731 .route(
4732 "/api/v1/peers",
4733 get(|| async {
4734 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4735 }),
4736 )
4737 .route(
4738 "/api/v1/config",
4739 get(|| async {
4740 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4741 }),
4742 );
4743
4744 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4746 return;
4747 };
4748 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4749
4750 let client = reqwest::Client::new();
4751 let (addr, token) = resolve_node(&client, "mac-mini").await;
4752 assert_eq!(addr, "10.0.0.5:7433");
4753 assert_eq!(token, Some("peer-secret".into()));
4754 }
4755
4756 #[cfg(not(coverage))]
4757 #[tokio::test]
4758 async fn test_resolve_node_peer_no_token() {
4759 use axum::{Router, routing::get};
4760
4761 let app = Router::new()
4762 .route(
4763 "/api/v1/peers",
4764 get(|| async {
4765 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4766 }),
4767 )
4768 .route(
4769 "/api/v1/config",
4770 get(|| async {
4771 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4772 }),
4773 );
4774
4775 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4776 return; };
4778 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4779
4780 let client = reqwest::Client::new();
4781 let (addr, token) = resolve_node(&client, "test-peer").await;
4782 assert_eq!(addr, "10.0.0.9:7433");
4783 assert!(token.is_none()); }
4785
4786 #[tokio::test]
4787 async fn test_execute_with_peer_name_resolution() {
4788 let cli = Cli {
4792 node: "nonexistent-peer".into(),
4793 token: None,
4794 command: Some(Commands::List { all: false }),
4795 path: None,
4796 };
4797 let result = execute(&cli).await;
4798 assert!(result.is_err());
4800 }
4801
4802 #[test]
4805 fn test_cli_parse_spawn_auto() {
4806 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4807 assert!(matches!(
4808 &cli.command,
4809 Some(Commands::Spawn { auto, .. }) if *auto
4810 ));
4811 }
4812
4813 #[test]
4814 fn test_cli_parse_spawn_auto_default() {
4815 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4816 assert!(matches!(
4817 &cli.command,
4818 Some(Commands::Spawn { auto, .. }) if !auto
4819 ));
4820 }
4821
4822 #[tokio::test]
4823 async fn test_select_best_node_coverage_stub() {
4824 let client = reqwest::Client::new();
4826 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4829 }
4830
4831 #[cfg(not(coverage))]
4832 #[tokio::test]
4833 async fn test_select_best_node_picks_least_loaded() {
4834 use axum::{Router, routing::get};
4835
4836 let app = Router::new().route(
4837 "/api/v1/peers",
4838 get(|| async {
4839 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4840 }),
4841 );
4842 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4843 let addr = listener.local_addr().unwrap();
4844 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4845 let base = format!("http://127.0.0.1:{}", addr.port());
4846
4847 let client = reqwest::Client::new();
4848 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4849 assert_eq!(name, "idle");
4853 assert_eq!(addr, "idle:7433");
4854 }
4855
4856 #[cfg(not(coverage))]
4857 #[tokio::test]
4858 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4859 use axum::{Router, routing::get};
4860
4861 let app = Router::new().route(
4862 "/api/v1/peers",
4863 get(|| async {
4864 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4865 }),
4866 );
4867 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4868 let addr = listener.local_addr().unwrap();
4869 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4870 let base = format!("http://127.0.0.1:{}", addr.port());
4871
4872 let client = reqwest::Client::new();
4873 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4874 assert_eq!(name, "my-mac");
4875 assert_eq!(addr, "localhost:7433");
4876 }
4877
4878 #[cfg(not(coverage))]
4879 #[tokio::test]
4880 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4881 use axum::{Router, routing::get};
4882
4883 let app = Router::new().route(
4884 "/api/v1/peers",
4885 get(|| async {
4886 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4887 }),
4888 );
4889 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4890 let addr = listener.local_addr().unwrap();
4891 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4892 let base = format!("http://127.0.0.1:{}", addr.port());
4893
4894 let client = reqwest::Client::new();
4895 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4896 assert_eq!(name, "solo");
4897 assert_eq!(addr, "localhost:7433");
4898 }
4899
4900 #[cfg(not(coverage))]
4901 #[tokio::test]
4902 async fn test_execute_spawn_auto_selects_node() {
4903 use axum::{
4904 Router,
4905 http::StatusCode,
4906 routing::{get, post},
4907 };
4908
4909 let create_json = test_create_response_json();
4910
4911 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4913 let addr = listener.local_addr().unwrap();
4914 let node = format!("127.0.0.1:{}", addr.port());
4915 let peer_addr = node.clone();
4916
4917 let app = Router::new()
4918 .route(
4919 "/api/v1/peers",
4920 get(move || {
4921 let peer_addr = peer_addr.clone();
4922 async move {
4923 format!(
4924 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4925 )
4926 }
4927 }),
4928 )
4929 .route(
4930 "/api/v1/sessions",
4931 post(move || async move {
4932 (StatusCode::CREATED, create_json.clone())
4933 }),
4934 );
4935
4936 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4937
4938 let cli = Cli {
4939 node,
4940 token: None,
4941 command: Some(Commands::Spawn {
4942 name: Some("test".into()),
4943 workdir: Some("/tmp/repo".into()),
4944 ink: None,
4945 description: None,
4946 detach: true,
4947 idle_threshold: None,
4948 auto: true,
4949 worktree: false,
4950 worktree_base: None,
4951 runtime: None,
4952 secret: vec![],
4953 command: vec!["echo".into(), "hello".into()],
4954 }),
4955 path: None,
4956 };
4957 let result = execute(&cli).await.unwrap();
4958 assert!(result.contains("Created session"));
4959 }
4960
4961 #[cfg(not(coverage))]
4962 #[tokio::test]
4963 async fn test_select_best_node_peer_no_session_count() {
4964 use axum::{Router, routing::get};
4965
4966 let app = Router::new().route(
4967 "/api/v1/peers",
4968 get(|| async {
4969 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4970 }),
4971 );
4972 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4973 let addr = listener.local_addr().unwrap();
4974 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4975 let base = format!("http://127.0.0.1:{}", addr.port());
4976
4977 let client = reqwest::Client::new();
4978 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4979 assert_eq!(name, "fresh");
4981 assert_eq!(addr, "fresh:7433");
4982 }
4983
4984 #[test]
4987 fn test_cli_parse_secret_set() {
4988 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4989 assert!(matches!(
4990 &cli.command,
4991 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4992 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4993 ));
4994 }
4995
4996 #[test]
4997 fn test_cli_parse_secret_list() {
4998 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4999 assert!(matches!(
5000 &cli.command,
5001 Some(Commands::Secret {
5002 action: SecretAction::List
5003 })
5004 ));
5005 }
5006
5007 #[test]
5008 fn test_cli_parse_secret_list_alias() {
5009 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
5010 assert!(matches!(
5011 &cli.command,
5012 Some(Commands::Secret {
5013 action: SecretAction::List
5014 })
5015 ));
5016 }
5017
5018 #[test]
5019 fn test_cli_parse_secret_delete() {
5020 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
5021 assert!(matches!(
5022 &cli.command,
5023 Some(Commands::Secret { action: SecretAction::Delete { name } })
5024 if name == "MY_TOKEN"
5025 ));
5026 }
5027
5028 #[test]
5029 fn test_cli_parse_secret_delete_alias() {
5030 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
5031 assert!(matches!(
5032 &cli.command,
5033 Some(Commands::Secret { action: SecretAction::Delete { name } })
5034 if name == "MY_TOKEN"
5035 ));
5036 }
5037
5038 #[test]
5039 fn test_cli_parse_secret_alias() {
5040 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
5041 assert!(matches!(
5042 &cli.command,
5043 Some(Commands::Secret {
5044 action: SecretAction::List
5045 })
5046 ));
5047 }
5048
5049 #[test]
5050 fn test_format_secrets_empty() {
5051 let secrets: Vec<serde_json::Value> = vec![];
5052 assert_eq!(format_secrets(&secrets), "No secrets configured.");
5053 }
5054
5055 #[test]
5056 fn test_format_secrets_with_entries() {
5057 let secrets = vec![
5058 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5059 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5060 ];
5061 let output = format_secrets(&secrets);
5062 assert!(output.contains("GITHUB_TOKEN"));
5063 assert!(output.contains("NPM_TOKEN"));
5064 assert!(output.contains("NAME"));
5065 assert!(output.contains("ENV"));
5066 assert!(output.contains("CREATED"));
5067 }
5068
5069 #[test]
5070 fn test_format_secrets_with_env() {
5071 let secrets = vec![
5072 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5073 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5074 ];
5075 let output = format_secrets(&secrets);
5076 assert!(output.contains("GH_WORK"));
5077 assert!(output.contains("GITHUB_TOKEN"));
5078 assert!(output.contains("NPM_TOKEN"));
5079 }
5080
5081 #[test]
5082 fn test_format_secrets_short_timestamp() {
5083 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
5084 let output = format_secrets(&secrets);
5085 assert!(output.contains("now"));
5086 }
5087
5088 #[test]
5089 fn test_format_schedules_short_last_run_at() {
5090 let schedules = vec![serde_json::json!({
5092 "name": "test",
5093 "cron": "* * * * *",
5094 "enabled": true,
5095 "last_run_at": "short",
5096 "target_node": null
5097 })];
5098 let output = format_schedules(&schedules);
5099 assert!(output.contains("short"));
5100 }
5101
5102 #[test]
5103 fn test_format_sessions_multibyte_command_truncation() {
5104 use chrono::Utc;
5105 use pulpo_common::session::SessionStatus;
5106 use uuid::Uuid;
5107
5108 let sessions = vec![Session {
5110 id: Uuid::nil(),
5111 name: "test".into(),
5112 workdir: "/tmp".into(),
5113 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
5114 description: None,
5115 status: SessionStatus::Active,
5116 exit_code: None,
5117 backend_session_id: None,
5118 output_snapshot: None,
5119 metadata: None,
5120 ink: None,
5121 intervention_code: None,
5122 intervention_reason: None,
5123 intervention_at: None,
5124 last_output_at: None,
5125 idle_since: None,
5126 idle_threshold_secs: None,
5127 worktree_path: None,
5128 worktree_branch: None,
5129 git_branch: None,
5130 git_commit: None,
5131 git_files_changed: None,
5132 git_insertions: None,
5133 git_deletions: None,
5134 git_ahead: None,
5135 runtime: Runtime::Tmux,
5136 created_at: Utc::now(),
5137 updated_at: Utc::now(),
5138 }];
5139 let output = format_sessions(&sessions);
5140 assert!(output.contains("..."));
5141 }
5142}