1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version = env!("PULPO_VERSION"),
15 args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18 #[arg(long, default_value = "localhost:7433")]
20 pub node: String,
21
22 #[arg(long)]
24 pub token: Option<String>,
25
26 #[command(subcommand)]
27 pub command: Option<Commands>,
28
29 #[arg(value_name = "PATH")]
31 pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37 #[command(visible_alias = "a")]
39 Attach {
40 name: String,
42 },
43
44 #[command(visible_alias = "i", visible_alias = "send")]
46 Input {
47 name: String,
49 text: Option<String>,
51 },
52
53 #[command(visible_alias = "s")]
55 Spawn {
56 name: Option<String>,
58
59 #[arg(long)]
61 workdir: Option<String>,
62
63 #[arg(long)]
65 ink: Option<String>,
66
67 #[arg(long)]
69 description: Option<String>,
70
71 #[arg(short, long)]
73 detach: bool,
74
75 #[arg(long)]
77 idle_threshold: Option<u32>,
78
79 #[arg(long)]
81 auto: bool,
82
83 #[arg(long)]
85 worktree: bool,
86
87 #[arg(long = "worktree-base")]
89 worktree_base: Option<String>,
90
91 #[arg(long)]
93 runtime: Option<String>,
94
95 #[arg(long)]
97 secret: Vec<String>,
98
99 #[arg(last = true)]
101 command: Vec<String>,
102 },
103
104 #[command(visible_alias = "ls")]
106 List {
107 #[arg(short, long)]
109 all: bool,
110 },
111
112 #[command(visible_alias = "l")]
114 Logs {
115 name: String,
117
118 #[arg(long, default_value = "100")]
120 lines: usize,
121
122 #[arg(short, long)]
124 follow: bool,
125 },
126
127 #[command(visible_alias = "k", alias = "kill")]
129 Stop {
130 #[arg(required = true)]
132 names: Vec<String>,
133
134 #[arg(long, short = 'p')]
136 purge: bool,
137 },
138
139 Cleanup,
141
142 #[command(visible_alias = "r")]
144 Resume {
145 name: String,
147 },
148
149 #[command(visible_alias = "n")]
151 Nodes,
152
153 #[command(visible_alias = "iv")]
155 Interventions {
156 name: String,
158 },
159
160 Ui,
162
163 #[command(visible_alias = "sched")]
165 Schedule {
166 #[command(subcommand)]
167 action: ScheduleAction,
168 },
169
170 #[command(visible_alias = "sec")]
172 Secret {
173 #[command(subcommand)]
174 action: SecretAction,
175 },
176
177 #[command(visible_alias = "wt")]
179 Worktree {
180 #[command(subcommand)]
181 action: WorktreeAction,
182 },
183}
184
185#[derive(Subcommand, Debug)]
186pub enum SecretAction {
187 Set {
189 name: String,
191 value: String,
193 #[arg(long)]
195 env: Option<String>,
196 },
197 #[command(visible_alias = "ls")]
199 List,
200 #[command(visible_alias = "rm")]
202 Delete {
203 name: String,
205 },
206}
207
208#[derive(Subcommand, Debug)]
209pub enum WorktreeAction {
210 #[command(visible_alias = "ls")]
212 List,
213}
214
215#[derive(Subcommand, Debug)]
216pub enum ScheduleAction {
217 #[command(alias = "install")]
219 Add {
220 name: String,
222 cron: String,
224 #[arg(long)]
226 workdir: Option<String>,
227 #[arg(long)]
229 node: Option<String>,
230 #[arg(long)]
232 ink: Option<String>,
233 #[arg(long)]
235 description: Option<String>,
236 #[arg(last = true)]
238 command: Vec<String>,
239 },
240 #[command(alias = "ls")]
242 List,
243 #[command(alias = "rm")]
245 Remove {
246 name: String,
248 },
249 Pause {
251 name: String,
253 },
254 Resume {
256 name: String,
258 },
259}
260
261const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
263
264fn resolve_path(path: &str) -> String {
266 let p = std::path::Path::new(path);
267 if p.is_absolute() {
268 path.to_owned()
269 } else {
270 std::env::current_dir().map_or_else(
271 |_| path.to_owned(),
272 |cwd| cwd.join(p).to_string_lossy().into_owned(),
273 )
274 }
275}
276
277fn derive_session_name(path: &str) -> String {
279 let basename = std::path::Path::new(path)
280 .file_name()
281 .and_then(|n| n.to_str())
282 .unwrap_or("session");
283 let kebab: String = basename
285 .chars()
286 .map(|c| {
287 if c.is_ascii_alphanumeric() {
288 c.to_ascii_lowercase()
289 } else {
290 '-'
291 }
292 })
293 .collect();
294 let mut result = String::new();
296 for c in kebab.chars() {
297 if c == '-' && result.ends_with('-') {
298 continue;
299 }
300 result.push(c);
301 }
302 let result = result.trim_matches('-').to_owned();
303 if result.is_empty() {
304 "session".to_owned()
305 } else {
306 result
307 }
308}
309
310async fn deduplicate_session_name(
312 client: &reqwest::Client,
313 base: &str,
314 name: &str,
315 token: Option<&str>,
316) -> String {
317 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
319 .send()
320 .await;
321 match resp {
322 Ok(r) if r.status().is_success() => {
323 for i in 2..=99 {
325 let candidate = format!("{name}-{i}");
326 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
327 .send()
328 .await;
329 match resp {
330 Ok(r) if r.status().is_success() => {}
331 _ => return candidate,
332 }
333 }
334 format!("{name}-100")
335 }
336 _ => name.to_owned(),
337 }
338}
339
340pub fn base_url(node: &str) -> String {
342 format!("http://{node}")
343}
344
345#[derive(serde::Deserialize)]
347struct OutputResponse {
348 output: String,
349}
350
351const fn session_runtime(session: &Session) -> &'static str {
353 match session.runtime {
354 Runtime::Tmux => "tmux",
355 Runtime::Docker => "docker",
356 }
357}
358
359fn format_sessions(sessions: &[Session]) -> String {
360 if sessions.is_empty() {
361 return "No sessions.".into();
362 }
363 let mut lines = vec![format!(
364 "{:<10} {:<24} {:<12} {:<8} {}",
365 "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
366 )];
367 for s in sessions {
368 let cmd_display = if s.command.len() > 50 {
369 let truncated: String = s.command.chars().take(47).collect();
370 format!("{truncated}...")
371 } else {
372 s.command.clone()
373 };
374 let short_id = &s.id.to_string()[..8];
375 let has_pr = s
376 .metadata
377 .as_ref()
378 .is_some_and(|m| m.contains_key("pr_url"));
379 let name_display = match (s.worktree_path.is_some(), has_pr) {
380 (true, true) => format!("{} [wt] [PR]", s.name),
381 (true, false) => format!("{} [wt]", s.name),
382 (false, true) => format!("{} [PR]", s.name),
383 (false, false) => s.name.clone(),
384 };
385 lines.push(format!(
386 "{:<10} {:<24} {:<12} {:<8} {}",
387 short_id,
388 name_display,
389 s.status,
390 session_runtime(s),
391 cmd_display
392 ));
393 }
394 lines.join("\n")
395}
396
397fn format_nodes(resp: &PeersResponse) -> String {
399 let mut lines = vec![format!(
400 "{:<20} {:<25} {:<10} {}",
401 "NAME", "ADDRESS", "STATUS", "SESSIONS"
402 )];
403 lines.push(format!(
404 "{:<20} {:<25} {:<10} {}",
405 resp.local.name, "(local)", "online", "-"
406 ));
407 for p in &resp.peers {
408 let sessions = p
409 .session_count
410 .map_or_else(|| "-".into(), |c| c.to_string());
411 lines.push(format!(
412 "{:<20} {:<25} {:<10} {}",
413 p.name, p.address, p.status, sessions
414 ));
415 }
416 lines.join("\n")
417}
418
419fn format_interventions(events: &[InterventionEventResponse]) -> String {
421 if events.is_empty() {
422 return "No intervention events.".into();
423 }
424 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
425 for e in events {
426 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
427 }
428 lines.join("\n")
429}
430
431#[cfg_attr(coverage, allow(dead_code))]
433fn build_open_command(url: &str) -> std::process::Command {
434 #[cfg(target_os = "macos")]
435 {
436 let mut cmd = std::process::Command::new("open");
437 cmd.arg(url);
438 cmd
439 }
440 #[cfg(target_os = "linux")]
441 {
442 let mut cmd = std::process::Command::new("xdg-open");
443 cmd.arg(url);
444 cmd
445 }
446 #[cfg(target_os = "windows")]
447 {
448 let mut cmd = std::process::Command::new("cmd");
449 cmd.args(["/C", "start", url]);
450 cmd
451 }
452 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
453 {
454 let mut cmd = std::process::Command::new("xdg-open");
456 cmd.arg(url);
457 cmd
458 }
459}
460
461#[cfg(not(coverage))]
463fn open_browser(url: &str) -> Result<()> {
464 build_open_command(url).status()?;
465 Ok(())
466}
467
468#[cfg(coverage)]
470fn open_browser(_url: &str) -> Result<()> {
471 Ok(())
472}
473
474#[cfg_attr(coverage, allow(dead_code))]
477fn build_attach_command(backend_session_id: &str) -> std::process::Command {
478 if let Some(container) = backend_session_id.strip_prefix("docker:") {
480 let mut cmd = std::process::Command::new("docker");
481 cmd.args(["exec", "-it", container, "/bin/sh"]);
482 return cmd;
483 }
484 #[cfg(not(target_os = "windows"))]
486 {
487 let mut cmd = std::process::Command::new("tmux");
488 cmd.args(["attach-session", "-t", backend_session_id]);
489 cmd
490 }
491 #[cfg(target_os = "windows")]
492 {
493 let mut cmd = std::process::Command::new("cmd");
495 cmd.args([
496 "/C",
497 "echo",
498 "Attach not available on Windows. Use the web UI or --runtime docker.",
499 ]);
500 cmd
501 }
502}
503
504#[cfg(not(any(test, coverage, target_os = "windows")))]
506fn attach_session(backend_session_id: &str) -> Result<()> {
507 let status = build_attach_command(backend_session_id).status()?;
508 if !status.success() {
509 anyhow::bail!("attach failed with {status}");
510 }
511 Ok(())
512}
513
514#[cfg(all(target_os = "windows", not(test), not(coverage)))]
516fn attach_session(_backend_session_id: &str) -> Result<()> {
517 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
518 Ok(())
519}
520
521#[cfg(any(test, coverage))]
523#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
524fn attach_session(_backend_session_id: &str) -> Result<()> {
525 Ok(())
526}
527
528fn api_error(text: &str) -> anyhow::Error {
530 serde_json::from_str::<serde_json::Value>(text)
531 .ok()
532 .and_then(|v| v["error"].as_str().map(String::from))
533 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
534}
535
536async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
538 if resp.status().is_success() {
539 Ok(resp.text().await?)
540 } else {
541 let text = resp.text().await?;
542 Err(api_error(&text))
543 }
544}
545
546fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
548 if err.is_connect() {
549 anyhow::anyhow!(
550 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
551 )
552 } else {
553 anyhow::anyhow!("Network error connecting to {node}: {err}")
554 }
555}
556
557fn is_localhost(node: &str) -> bool {
559 let host = node.split(':').next().unwrap_or(node);
560 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
561}
562
563async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
565 let resp = client
566 .get(format!("{base}/api/v1/auth/token"))
567 .send()
568 .await
569 .ok()?;
570 let body: AuthTokenResponse = resp.json().await.ok()?;
571 if body.token.is_empty() {
572 None
573 } else {
574 Some(body.token)
575 }
576}
577
578async fn resolve_token(
580 client: &reqwest::Client,
581 base: &str,
582 node: &str,
583 explicit: Option<&str>,
584) -> Option<String> {
585 if let Some(t) = explicit {
586 return Some(t.to_owned());
587 }
588 if is_localhost(node) {
589 return discover_token(client, base).await;
590 }
591 None
592}
593
594fn node_needs_resolution(node: &str) -> bool {
596 !node.contains(':')
597}
598
599#[cfg(not(coverage))]
606async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
607 if !node_needs_resolution(node) {
609 return (node.to_owned(), None);
610 }
611
612 let local_base = "http://localhost:7433";
614 let mut resolved_address: Option<String> = None;
615
616 if let Ok(resp) = client
617 .get(format!("{local_base}/api/v1/peers"))
618 .send()
619 .await
620 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
621 {
622 for peer in &peers_resp.peers {
623 if peer.name == node {
624 resolved_address = Some(peer.address.clone());
625 break;
626 }
627 }
628 }
629
630 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
631
632 let peer_token = if let Ok(resp) = client
634 .get(format!("{local_base}/api/v1/config"))
635 .send()
636 .await
637 && let Ok(config) = resp.json::<ConfigResponse>().await
638 && let Some(entry) = config.peers.get(node)
639 {
640 entry.token().map(String::from)
641 } else {
642 None
643 };
644
645 (address, peer_token)
646}
647
648#[cfg(coverage)]
650async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
651 if node_needs_resolution(node) {
652 (format!("{node}:7433"), None)
653 } else {
654 (node.to_owned(), None)
655 }
656}
657
658fn authed_get(
660 client: &reqwest::Client,
661 url: String,
662 token: Option<&str>,
663) -> reqwest::RequestBuilder {
664 let req = client.get(url);
665 if let Some(t) = token {
666 req.bearer_auth(t)
667 } else {
668 req
669 }
670}
671
672fn authed_post(
674 client: &reqwest::Client,
675 url: String,
676 token: Option<&str>,
677) -> reqwest::RequestBuilder {
678 let req = client.post(url);
679 if let Some(t) = token {
680 req.bearer_auth(t)
681 } else {
682 req
683 }
684}
685
686fn authed_delete(
688 client: &reqwest::Client,
689 url: String,
690 token: Option<&str>,
691) -> reqwest::RequestBuilder {
692 let req = client.delete(url);
693 if let Some(t) = token {
694 req.bearer_auth(t)
695 } else {
696 req
697 }
698}
699
700#[cfg(not(coverage))]
702fn authed_put(
703 client: &reqwest::Client,
704 url: String,
705 token: Option<&str>,
706) -> reqwest::RequestBuilder {
707 let req = client.put(url);
708 if let Some(t) = token {
709 req.bearer_auth(t)
710 } else {
711 req
712 }
713}
714
715async fn fetch_output(
717 client: &reqwest::Client,
718 base: &str,
719 name: &str,
720 lines: usize,
721 token: Option<&str>,
722) -> Result<String> {
723 let resp = authed_get(
724 client,
725 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
726 token,
727 )
728 .send()
729 .await?;
730 let text = ok_or_api_error(resp).await?;
731 let output: OutputResponse = serde_json::from_str(&text)?;
732 Ok(output.output)
733}
734
735async fn fetch_session_status(
737 client: &reqwest::Client,
738 base: &str,
739 name: &str,
740 token: Option<&str>,
741) -> Result<String> {
742 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
743 .send()
744 .await?;
745 let text = ok_or_api_error(resp).await?;
746 let session: Session = serde_json::from_str(&text)?;
747 Ok(session.status.to_string())
748}
749
750async fn check_session_alive(
754 client: &reqwest::Client,
755 base: &str,
756 session_id: &str,
757 token: Option<&str>,
758) -> Result<()> {
759 for _ in 0..3 {
761 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
762 let resp = authed_get(
764 client,
765 format!("{base}/api/v1/sessions/{session_id}"),
766 token,
767 )
768 .send()
769 .await;
770 if let Ok(resp) = resp
771 && let Ok(text) = ok_or_api_error(resp).await
772 && let Ok(session) = serde_json::from_str::<Session>(&text)
773 {
774 match session.status.to_string().as_str() {
775 "creating" => continue,
776 "lost" | "stopped" => {
777 anyhow::bail!(
778 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
779 session.name,
780 session.name
781 );
782 }
783 _ => return Ok(()),
784 }
785 }
786 break;
788 }
789 Ok(())
790}
791
792fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
799 if prev.is_empty() {
800 return new;
801 }
802
803 let prev_lines: Vec<&str> = prev.lines().collect();
804 let new_lines: Vec<&str> = new.lines().collect();
805
806 if new_lines.is_empty() {
807 return "";
808 }
809
810 let last_prev = prev_lines[prev_lines.len() - 1];
812
813 for i in (0..new_lines.len()).rev() {
815 if new_lines[i] == last_prev {
816 let overlap_len = prev_lines.len().min(i + 1);
818 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
819 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
820 if prev_tail == new_overlap {
821 if i + 1 < new_lines.len() {
822 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
824 return new.get(consumed.min(new.len())..).unwrap_or("");
825 }
826 return "";
827 }
828 }
829 }
830
831 new
833}
834
835async fn follow_logs(
837 client: &reqwest::Client,
838 base: &str,
839 name: &str,
840 lines: usize,
841 token: Option<&str>,
842 writer: &mut (dyn std::io::Write + Send),
843) -> Result<()> {
844 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
845 write!(writer, "{prev_output}")?;
846
847 let mut unchanged_ticks: u32 = 0;
848
849 loop {
850 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
851
852 let new_output = fetch_output(client, base, name, lines, token).await?;
854
855 let diff = diff_output(&prev_output, &new_output);
856 if diff.is_empty() {
857 unchanged_ticks += 1;
858 } else {
859 write!(writer, "{diff}")?;
860 unchanged_ticks = 0;
861 }
862
863 if new_output.contains(AGENT_EXIT_MARKER) {
865 break;
866 }
867
868 prev_output = new_output;
869
870 if unchanged_ticks >= 3 {
872 let status = fetch_session_status(client, base, name, token).await?;
873 let is_terminal = status == "ready" || status == "stopped" || status == "lost";
874 if is_terminal {
875 break;
876 }
877 }
878 }
879 Ok(())
880}
881
882#[cfg(not(coverage))]
886async fn execute_schedule(
887 client: &reqwest::Client,
888 action: &ScheduleAction,
889 base: &str,
890 token: Option<&str>,
891) -> Result<String> {
892 match action {
893 ScheduleAction::Add {
894 name,
895 cron,
896 workdir,
897 node,
898 ink,
899 description,
900 command,
901 } => {
902 let cmd = if command.is_empty() {
903 None
904 } else {
905 Some(command.join(" "))
906 };
907 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
908 std::env::current_dir()
909 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
910 });
911 let mut body = serde_json::json!({
912 "name": name,
913 "cron": cron,
914 "workdir": resolved_workdir,
915 });
916 if let Some(c) = &cmd {
917 body["command"] = serde_json::json!(c);
918 }
919 if let Some(n) = node {
920 body["target_node"] = serde_json::json!(n);
921 }
922 if let Some(i) = ink {
923 body["ink"] = serde_json::json!(i);
924 }
925 if let Some(d) = description {
926 body["description"] = serde_json::json!(d);
927 }
928 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
929 .json(&body)
930 .send()
931 .await?;
932 ok_or_api_error(resp).await?;
933 Ok(format!("Created schedule \"{name}\""))
934 }
935 ScheduleAction::List => {
936 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
937 .send()
938 .await?;
939 let text = ok_or_api_error(resp).await?;
940 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
941 Ok(format_schedules(&schedules))
942 }
943 ScheduleAction::Remove { name } => {
944 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
945 .send()
946 .await?;
947 ok_or_api_error(resp).await?;
948 Ok(format!("Removed schedule \"{name}\""))
949 }
950 ScheduleAction::Pause { name } => {
951 let body = serde_json::json!({ "enabled": false });
952 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
953 .json(&body)
954 .send()
955 .await?;
956 ok_or_api_error(resp).await?;
957 Ok(format!("Paused schedule \"{name}\""))
958 }
959 ScheduleAction::Resume { name } => {
960 let body = serde_json::json!({ "enabled": true });
961 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
962 .json(&body)
963 .send()
964 .await?;
965 ok_or_api_error(resp).await?;
966 Ok(format!("Resumed schedule \"{name}\""))
967 }
968 }
969}
970
971#[cfg(coverage)]
973#[allow(clippy::unnecessary_wraps)]
974async fn execute_schedule(
975 _client: &reqwest::Client,
976 _action: &ScheduleAction,
977 _base: &str,
978 _token: Option<&str>,
979) -> Result<String> {
980 Ok(String::new())
981}
982
983#[cfg_attr(coverage, allow(dead_code))]
987fn format_secrets(secrets: &[serde_json::Value]) -> String {
988 if secrets.is_empty() {
989 return "No secrets configured.".into();
990 }
991 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
992 for s in secrets {
993 let name = s["name"].as_str().unwrap_or("?");
994 let env_display = s["env"]
995 .as_str()
996 .map_or_else(|| name.to_owned(), String::from);
997 let created = s["created_at"]
998 .as_str()
999 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1000 lines.push(format!("{name:<24} {env_display:<24} {created}"));
1001 }
1002 lines.join("\n")
1003}
1004
1005#[cfg(not(coverage))]
1007async fn execute_secret(
1008 client: &reqwest::Client,
1009 action: &SecretAction,
1010 base: &str,
1011 token: Option<&str>,
1012) -> Result<String> {
1013 match action {
1014 SecretAction::Set { name, value, env } => {
1015 let mut body = serde_json::json!({ "value": value });
1016 if let Some(e) = env {
1017 body["env"] = serde_json::json!(e);
1018 }
1019 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1020 .json(&body)
1021 .send()
1022 .await?;
1023 ok_or_api_error(resp).await?;
1024 Ok(format!("Secret \"{name}\" set."))
1025 }
1026 SecretAction::List => {
1027 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1028 .send()
1029 .await?;
1030 let text = ok_or_api_error(resp).await?;
1031 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1032 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1033 Ok(format_secrets(secrets))
1034 }
1035 SecretAction::Delete { name } => {
1036 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1037 .send()
1038 .await?;
1039 ok_or_api_error(resp).await?;
1040 Ok(format!("Secret \"{name}\" deleted."))
1041 }
1042 }
1043}
1044
1045#[cfg(coverage)]
1047#[allow(clippy::unnecessary_wraps)]
1048async fn execute_secret(
1049 _client: &reqwest::Client,
1050 _action: &SecretAction,
1051 _base: &str,
1052 _token: Option<&str>,
1053) -> Result<String> {
1054 Ok(String::new())
1055}
1056
1057#[cfg(not(coverage))]
1059async fn execute_worktree(
1060 client: &reqwest::Client,
1061 action: &WorktreeAction,
1062 base: &str,
1063 token: Option<&str>,
1064) -> Result<String> {
1065 match action {
1066 WorktreeAction::List => {
1067 let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1068 .send()
1069 .await?;
1070 let text = ok_or_api_error(resp).await?;
1071 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1072 let wt_sessions: Vec<&Session> = sessions
1073 .iter()
1074 .filter(|s| s.worktree_path.is_some())
1075 .collect();
1076 Ok(format_worktree_sessions(&wt_sessions))
1077 }
1078 }
1079}
1080
1081#[cfg(coverage)]
1083#[allow(clippy::unnecessary_wraps)]
1084async fn execute_worktree(
1085 _client: &reqwest::Client,
1086 _action: &WorktreeAction,
1087 _base: &str,
1088 _token: Option<&str>,
1089) -> Result<String> {
1090 Ok(String::new())
1091}
1092
1093fn format_worktree_sessions(sessions: &[&Session]) -> String {
1095 if sessions.is_empty() {
1096 return "No worktree sessions.".into();
1097 }
1098 let mut lines = vec![format!(
1099 "{:<20} {:<20} {:<10} {}",
1100 "NAME", "BRANCH", "STATUS", "PATH"
1101 )];
1102 for s in sessions {
1103 let branch = s.worktree_branch.as_deref().unwrap_or("-");
1104 let path = s.worktree_path.as_deref().unwrap_or("-");
1105 lines.push(format!(
1106 "{:<20} {:<20} {:<10} {}",
1107 s.name, branch, s.status, path
1108 ));
1109 }
1110 lines.join("\n")
1111}
1112
1113#[cfg_attr(coverage, allow(dead_code))]
1115fn format_schedules(schedules: &[serde_json::Value]) -> String {
1116 if schedules.is_empty() {
1117 return "No schedules.".into();
1118 }
1119 let mut lines = vec![format!(
1120 "{:<20} {:<18} {:<8} {:<12} {}",
1121 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1122 )];
1123 for s in schedules {
1124 let name = s["name"].as_str().unwrap_or("?");
1125 let cron = s["cron"].as_str().unwrap_or("?");
1126 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1127 "yes"
1128 } else {
1129 "no"
1130 };
1131 let last_run = s["last_run_at"]
1132 .as_str()
1133 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1134 let node = s["target_node"].as_str().unwrap_or("local");
1135 lines.push(format!(
1136 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1137 ));
1138 }
1139 lines.join("\n")
1140}
1141
1142#[cfg(not(coverage))]
1146async fn select_best_node(
1147 client: &reqwest::Client,
1148 base: &str,
1149 token: Option<&str>,
1150) -> Result<(String, String)> {
1151 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1152 .send()
1153 .await?;
1154 let text = ok_or_api_error(resp).await?;
1155 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1156
1157 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1161 if peer.status != pulpo_common::peer::PeerStatus::Online {
1162 continue;
1163 }
1164 let sessions = peer.session_count.unwrap_or(0);
1165 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1166 #[allow(clippy::cast_precision_loss)]
1168 let score = sessions as f64 - (mem as f64 / 1024.0);
1169 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1170 best = Some((peer.address.clone(), peer.name.clone(), score));
1171 }
1172 }
1173
1174 match best {
1176 Some((addr, name, _)) => Ok((addr, name)),
1177 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1178 }
1179}
1180
1181#[cfg(coverage)]
1183#[allow(clippy::unnecessary_wraps)]
1184async fn select_best_node(
1185 _client: &reqwest::Client,
1186 _base: &str,
1187 _token: Option<&str>,
1188) -> Result<(String, String)> {
1189 Ok(("localhost:7433".into(), "local".into()))
1190}
1191
1192#[allow(clippy::too_many_lines)]
1194pub async fn execute(cli: &Cli) -> Result<String> {
1195 let client = reqwest::Client::new();
1196 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1197 let url = base_url(&resolved_node);
1198 let node = &resolved_node;
1199 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1200 .await
1201 .or(peer_token);
1202
1203 if cli.command.is_none() && cli.path.is_none() {
1205 use clap::CommandFactory;
1207 let mut cmd = Cli::command();
1208 cmd.print_help()?;
1209 println!();
1210 return Ok(String::new());
1211 }
1212 if cli.command.is_none() {
1213 let path = cli.path.as_deref().unwrap_or(".");
1214 let resolved_workdir = resolve_path(path);
1215 let base_name = derive_session_name(&resolved_workdir);
1216 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1217 let body = serde_json::json!({
1218 "name": name,
1219 "workdir": resolved_workdir,
1220 });
1221 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1222 .json(&body)
1223 .send()
1224 .await
1225 .map_err(|e| friendly_error(&e, node))?;
1226 let text = ok_or_api_error(resp).await?;
1227 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1228 let msg = format!(
1229 "Created session \"{}\" ({})",
1230 resp.session.name, resp.session.id
1231 );
1232 let backend_id = resp
1233 .session
1234 .backend_session_id
1235 .as_deref()
1236 .unwrap_or(&resp.session.name);
1237 eprintln!("{msg}");
1238 attach_session(backend_id)?;
1241 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1242 }
1243
1244 match cli.command.as_ref().unwrap() {
1245 Commands::Attach { name } => {
1246 let resp = authed_get(
1248 &client,
1249 format!("{url}/api/v1/sessions/{name}"),
1250 token.as_deref(),
1251 )
1252 .send()
1253 .await
1254 .map_err(|e| friendly_error(&e, node))?;
1255 let text = ok_or_api_error(resp).await?;
1256 let session: Session = serde_json::from_str(&text)?;
1257 match session.status.to_string().as_str() {
1258 "lost" => {
1259 anyhow::bail!(
1260 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1261 );
1262 }
1263 "stopped" => {
1264 anyhow::bail!(
1265 "Session \"{name}\" is {} — cannot attach to a stopped session.",
1266 session.status
1267 );
1268 }
1269 _ => {}
1270 }
1271 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1272 attach_session(&backend_id)?;
1273 Ok(format!("Detached from session {name}."))
1274 }
1275 Commands::Input { name, text } => {
1276 let input_text = text.as_deref().unwrap_or("\n");
1277 let body = serde_json::json!({ "text": input_text });
1278 let resp = authed_post(
1279 &client,
1280 format!("{url}/api/v1/sessions/{name}/input"),
1281 token.as_deref(),
1282 )
1283 .json(&body)
1284 .send()
1285 .await
1286 .map_err(|e| friendly_error(&e, node))?;
1287 ok_or_api_error(resp).await?;
1288 Ok(format!("Sent input to session {name}."))
1289 }
1290 Commands::List { all } => {
1291 let list_url = if *all {
1292 format!("{url}/api/v1/sessions")
1293 } else {
1294 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1295 };
1296 let resp = authed_get(&client, list_url, token.as_deref())
1297 .send()
1298 .await
1299 .map_err(|e| friendly_error(&e, node))?;
1300 let text = ok_or_api_error(resp).await?;
1301 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1302 Ok(format_sessions(&sessions))
1303 }
1304 Commands::Nodes => {
1305 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1306 .send()
1307 .await
1308 .map_err(|e| friendly_error(&e, node))?;
1309 let text = ok_or_api_error(resp).await?;
1310 let resp: PeersResponse = serde_json::from_str(&text)?;
1311 Ok(format_nodes(&resp))
1312 }
1313 Commands::Spawn {
1314 workdir,
1315 name,
1316 ink,
1317 description,
1318 detach,
1319 idle_threshold,
1320 auto,
1321 worktree,
1322 worktree_base,
1323 runtime,
1324 secret,
1325 command,
1326 } => {
1327 let cmd = if command.is_empty() {
1328 None
1329 } else {
1330 Some(command.join(" "))
1331 };
1332 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1334 std::env::current_dir()
1335 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1336 });
1337 let resolved_name = if let Some(n) = name {
1339 n.clone()
1340 } else {
1341 let base_name = derive_session_name(&resolved_workdir);
1342 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1343 };
1344 let mut body = serde_json::json!({
1345 "name": resolved_name,
1346 "workdir": resolved_workdir,
1347 });
1348 if let Some(c) = &cmd {
1349 body["command"] = serde_json::json!(c);
1350 }
1351 if let Some(i) = ink {
1352 body["ink"] = serde_json::json!(i);
1353 }
1354 if let Some(d) = description {
1355 body["description"] = serde_json::json!(d);
1356 }
1357 if let Some(t) = idle_threshold {
1358 body["idle_threshold_secs"] = serde_json::json!(t);
1359 }
1360 if *worktree || worktree_base.is_some() {
1362 body["worktree"] = serde_json::json!(true);
1363 if let Some(base) = worktree_base {
1364 body["worktree_base"] = serde_json::json!(base);
1365 eprintln!(
1366 "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1367 );
1368 } else {
1369 eprintln!(
1370 "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1371 );
1372 }
1373 }
1374 if let Some(rt) = runtime {
1375 body["runtime"] = serde_json::json!(rt);
1376 }
1377 if !secret.is_empty() {
1378 body["secrets"] = serde_json::json!(secret);
1379 }
1380 let spawn_url = if *auto {
1381 let (auto_addr, auto_name) =
1382 select_best_node(&client, &url, token.as_deref()).await?;
1383 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1384 base_url(&auto_addr)
1385 } else {
1386 url.clone()
1387 };
1388 let resp = authed_post(
1389 &client,
1390 format!("{spawn_url}/api/v1/sessions"),
1391 token.as_deref(),
1392 )
1393 .json(&body)
1394 .send()
1395 .await
1396 .map_err(|e| friendly_error(&e, node))?;
1397 let text = ok_or_api_error(resp).await?;
1398 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1399 let msg = format!(
1400 "Created session \"{}\" ({})",
1401 resp.session.name, resp.session.id
1402 );
1403 if !detach {
1404 let backend_id = resp
1405 .session
1406 .backend_session_id
1407 .as_deref()
1408 .unwrap_or(&resp.session.name);
1409 eprintln!("{msg}");
1410 if cmd.is_some() {
1413 let sid = resp.session.id.to_string();
1414 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1415 }
1416 attach_session(backend_id)?;
1417 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1418 }
1419 Ok(msg)
1420 }
1421 Commands::Stop { names, purge } => {
1422 let mut results = Vec::new();
1423 for name in names {
1424 let query = if *purge { "?purge=true" } else { "" };
1425 let resp = authed_post(
1426 &client,
1427 format!("{url}/api/v1/sessions/{name}/stop{query}"),
1428 token.as_deref(),
1429 )
1430 .send()
1431 .await
1432 .map_err(|e| friendly_error(&e, node))?;
1433 let action = if *purge {
1434 "stopped and purged"
1435 } else {
1436 "stopped"
1437 };
1438 match ok_or_api_error(resp).await {
1439 Ok(_) => results.push(format!("Session {name} {action}.")),
1440 Err(e) => results.push(format!("Error stopping {name}: {e}")),
1441 }
1442 }
1443 Ok(results.join("\n"))
1444 }
1445 Commands::Cleanup => {
1446 let resp = authed_post(
1447 &client,
1448 format!("{url}/api/v1/sessions/cleanup"),
1449 token.as_deref(),
1450 )
1451 .send()
1452 .await
1453 .map_err(|e| friendly_error(&e, node))?;
1454 let text = ok_or_api_error(resp).await?;
1455 let result: serde_json::Value = serde_json::from_str(&text)?;
1456 let count = result["deleted"].as_u64().unwrap_or(0);
1457 if count == 0 {
1458 Ok("No stopped or lost sessions to clean up.".into())
1459 } else {
1460 Ok(format!("Cleaned up {count} session(s)."))
1461 }
1462 }
1463 Commands::Logs {
1464 name,
1465 lines,
1466 follow,
1467 } => {
1468 if *follow {
1469 let mut stdout = std::io::stdout();
1470 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1471 .await
1472 .map_err(|e| {
1473 match e.downcast::<reqwest::Error>() {
1475 Ok(re) => friendly_error(&re, node),
1476 Err(other) => other,
1477 }
1478 })?;
1479 Ok(String::new())
1480 } else {
1481 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1482 .await
1483 .map_err(|e| match e.downcast::<reqwest::Error>() {
1484 Ok(re) => friendly_error(&re, node),
1485 Err(other) => other,
1486 })?;
1487 Ok(output)
1488 }
1489 }
1490 Commands::Interventions { name } => {
1491 let resp = authed_get(
1492 &client,
1493 format!("{url}/api/v1/sessions/{name}/interventions"),
1494 token.as_deref(),
1495 )
1496 .send()
1497 .await
1498 .map_err(|e| friendly_error(&e, node))?;
1499 let text = ok_or_api_error(resp).await?;
1500 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1501 Ok(format_interventions(&events))
1502 }
1503 Commands::Ui => {
1504 let dashboard = base_url(node);
1505 open_browser(&dashboard)?;
1506 Ok(format!("Opening {dashboard}"))
1507 }
1508 Commands::Resume { name } => {
1509 let resp = authed_post(
1510 &client,
1511 format!("{url}/api/v1/sessions/{name}/resume"),
1512 token.as_deref(),
1513 )
1514 .send()
1515 .await
1516 .map_err(|e| friendly_error(&e, node))?;
1517 let text = ok_or_api_error(resp).await?;
1518 let session: Session = serde_json::from_str(&text)?;
1519 let backend_id = session
1520 .backend_session_id
1521 .as_deref()
1522 .unwrap_or(&session.name);
1523 eprintln!("Resumed session \"{}\"", session.name);
1524 let sid = session.id.to_string();
1525 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1526 attach_session(backend_id)?;
1527 Ok(format!("Detached from session \"{}\".", session.name))
1528 }
1529 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1530 .await
1531 .map_err(|e| match e.downcast::<reqwest::Error>() {
1532 Ok(re) => friendly_error(&re, node),
1533 Err(other) => other,
1534 }),
1535 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1536 .await
1537 .map_err(|e| match e.downcast::<reqwest::Error>() {
1538 Ok(re) => friendly_error(&re, node),
1539 Err(other) => other,
1540 }),
1541 Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1542 .await
1543 .map_err(|e| match e.downcast::<reqwest::Error>() {
1544 Ok(re) => friendly_error(&re, node),
1545 Err(other) => other,
1546 }),
1547 }
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552 use super::*;
1553
1554 #[test]
1555 fn test_base_url() {
1556 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1557 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1558 }
1559
1560 #[test]
1561 fn test_cli_parse_list() {
1562 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1563 assert_eq!(cli.node, "localhost:7433");
1564 assert!(matches!(cli.command, Some(Commands::List { .. })));
1565 }
1566
1567 #[test]
1568 fn test_cli_parse_nodes() {
1569 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1570 assert!(matches!(cli.command, Some(Commands::Nodes)));
1571 }
1572
1573 #[test]
1574 fn test_cli_parse_ui() {
1575 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1576 assert!(matches!(cli.command, Some(Commands::Ui)));
1577 }
1578
1579 #[test]
1580 fn test_cli_parse_ui_custom_node() {
1581 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1582 assert_eq!(cli.node, "mac-mini:7433");
1584 assert_eq!(cli.path.as_deref(), Some("ui"));
1585 }
1586
1587 #[test]
1588 fn test_build_open_command() {
1589 let cmd = build_open_command("http://localhost:7433");
1590 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1591 assert_eq!(args, vec!["http://localhost:7433"]);
1592 #[cfg(target_os = "macos")]
1593 assert_eq!(cmd.get_program(), "open");
1594 #[cfg(target_os = "linux")]
1595 assert_eq!(cmd.get_program(), "xdg-open");
1596 }
1597
1598 #[test]
1599 fn test_cli_parse_spawn() {
1600 let cli = Cli::try_parse_from([
1601 "pulpo",
1602 "spawn",
1603 "my-task",
1604 "--workdir",
1605 "/tmp/repo",
1606 "--",
1607 "claude",
1608 "-p",
1609 "Fix the bug",
1610 ])
1611 .unwrap();
1612 assert!(matches!(
1613 &cli.command,
1614 Some(Commands::Spawn { name, workdir, command, .. })
1615 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1616 && command == &["claude", "-p", "Fix the bug"]
1617 ));
1618 }
1619
1620 #[test]
1621 fn test_cli_parse_spawn_with_ink() {
1622 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1623 assert!(matches!(
1624 &cli.command,
1625 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1626 ));
1627 }
1628
1629 #[test]
1630 fn test_cli_parse_spawn_with_description() {
1631 let cli =
1632 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1633 .unwrap();
1634 assert!(matches!(
1635 &cli.command,
1636 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1637 ));
1638 }
1639
1640 #[test]
1641 fn test_cli_parse_spawn_name_positional() {
1642 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1643 assert!(matches!(
1644 &cli.command,
1645 Some(Commands::Spawn { name, command, .. })
1646 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1647 ));
1648 }
1649
1650 #[test]
1651 fn test_cli_parse_spawn_no_command() {
1652 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1653 assert!(matches!(
1654 &cli.command,
1655 Some(Commands::Spawn { command, .. }) if command.is_empty()
1656 ));
1657 }
1658
1659 #[test]
1660 fn test_cli_parse_spawn_idle_threshold() {
1661 let cli =
1662 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1663 assert!(matches!(
1664 &cli.command,
1665 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1666 ));
1667 }
1668
1669 #[test]
1670 fn test_cli_parse_spawn_idle_threshold_60() {
1671 let cli =
1672 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1673 assert!(matches!(
1674 &cli.command,
1675 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1676 ));
1677 }
1678
1679 #[test]
1680 fn test_cli_parse_spawn_secrets() {
1681 let cli = Cli::try_parse_from([
1682 "pulpo",
1683 "spawn",
1684 "my-task",
1685 "--secret",
1686 "GITHUB_TOKEN",
1687 "--secret",
1688 "NPM_TOKEN",
1689 ])
1690 .unwrap();
1691 assert!(matches!(
1692 &cli.command,
1693 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1694 ));
1695 }
1696
1697 #[test]
1698 fn test_cli_parse_spawn_no_secrets() {
1699 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1700 assert!(matches!(
1701 &cli.command,
1702 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1703 ));
1704 }
1705
1706 #[test]
1707 fn test_cli_parse_secret_set_with_env() {
1708 let cli = Cli::try_parse_from([
1709 "pulpo",
1710 "secret",
1711 "set",
1712 "GH_WORK",
1713 "token123",
1714 "--env",
1715 "GITHUB_TOKEN",
1716 ])
1717 .unwrap();
1718 assert!(matches!(
1719 &cli.command,
1720 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1721 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1722 ));
1723 }
1724
1725 #[test]
1726 fn test_cli_parse_secret_set_without_env() {
1727 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1728 assert!(matches!(
1729 &cli.command,
1730 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1731 if name == "MY_KEY" && value == "val" && env.is_none()
1732 ));
1733 }
1734
1735 #[test]
1736 fn test_cli_parse_spawn_worktree() {
1737 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1738 assert!(matches!(
1739 &cli.command,
1740 Some(Commands::Spawn { worktree, .. }) if *worktree
1741 ));
1742 }
1743
1744 #[test]
1745 fn test_cli_parse_spawn_worktree_base() {
1746 let cli = Cli::try_parse_from([
1747 "pulpo",
1748 "spawn",
1749 "my-task",
1750 "--worktree",
1751 "--worktree-base",
1752 "main",
1753 ])
1754 .unwrap();
1755 assert!(matches!(
1756 &cli.command,
1757 Some(Commands::Spawn { worktree, worktree_base, .. })
1758 if *worktree && worktree_base.as_deref() == Some("main")
1759 ));
1760 }
1761
1762 #[test]
1763 fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1764 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1766 .unwrap();
1767 assert!(matches!(
1768 &cli.command,
1769 Some(Commands::Spawn { worktree_base, .. })
1770 if worktree_base.as_deref() == Some("develop")
1771 ));
1772 }
1773
1774 #[test]
1775 fn test_cli_parse_worktree_list() {
1776 let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1777 assert!(matches!(
1778 &cli.command,
1779 Some(Commands::Worktree {
1780 action: WorktreeAction::List
1781 })
1782 ));
1783 }
1784
1785 #[test]
1786 fn test_cli_parse_wt_alias() {
1787 let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1788 assert!(matches!(
1789 &cli.command,
1790 Some(Commands::Worktree {
1791 action: WorktreeAction::List
1792 })
1793 ));
1794 }
1795
1796 #[test]
1797 fn test_cli_parse_worktree_list_ls_alias() {
1798 let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1799 assert!(matches!(
1800 &cli.command,
1801 Some(Commands::Worktree {
1802 action: WorktreeAction::List
1803 })
1804 ));
1805 }
1806
1807 #[test]
1808 fn test_format_worktree_sessions_empty() {
1809 let output = format_worktree_sessions(&[]);
1810 assert_eq!(output, "No worktree sessions.");
1811 }
1812
1813 #[test]
1814 fn test_format_worktree_sessions_with_data() {
1815 use chrono::Utc;
1816 use pulpo_common::session::SessionStatus;
1817 use uuid::Uuid;
1818
1819 let session = Session {
1820 id: Uuid::nil(),
1821 name: "fix-auth".into(),
1822 workdir: "/tmp/repo".into(),
1823 command: "claude -p 'fix auth'".into(),
1824 description: None,
1825 status: SessionStatus::Active,
1826 exit_code: None,
1827 backend_session_id: None,
1828 output_snapshot: None,
1829 metadata: None,
1830 ink: None,
1831 intervention_code: None,
1832 intervention_reason: None,
1833 intervention_at: None,
1834 last_output_at: None,
1835 idle_since: None,
1836 idle_threshold_secs: None,
1837 worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
1838 worktree_branch: Some("fix-auth".into()),
1839 runtime: Runtime::Tmux,
1840 created_at: Utc::now(),
1841 updated_at: Utc::now(),
1842 };
1843 let sessions = vec![&session];
1844 let output = format_worktree_sessions(&sessions);
1845 assert!(output.contains("fix-auth"), "should show name: {output}");
1846 assert!(output.contains("active"), "should show status: {output}");
1847 assert!(
1848 output.contains("/home/user/.pulpo/worktrees/fix-auth"),
1849 "should show path: {output}"
1850 );
1851 assert!(output.contains("BRANCH"), "should have header: {output}");
1852 }
1853
1854 #[test]
1855 fn test_format_worktree_sessions_no_branch() {
1856 use chrono::Utc;
1857 use pulpo_common::session::SessionStatus;
1858 use uuid::Uuid;
1859
1860 let session = Session {
1861 id: Uuid::nil(),
1862 name: "old-session".into(),
1863 workdir: "/tmp".into(),
1864 command: "echo".into(),
1865 description: None,
1866 status: SessionStatus::Active,
1867 exit_code: None,
1868 backend_session_id: None,
1869 output_snapshot: None,
1870 metadata: None,
1871 ink: None,
1872 intervention_code: None,
1873 intervention_reason: None,
1874 intervention_at: None,
1875 last_output_at: None,
1876 idle_since: None,
1877 idle_threshold_secs: None,
1878 worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
1879 worktree_branch: None,
1880 runtime: Runtime::Tmux,
1881 created_at: Utc::now(),
1882 updated_at: Utc::now(),
1883 };
1884 let sessions = vec![&session];
1885 let output = format_worktree_sessions(&sessions);
1886 assert!(
1887 output.contains('-'),
1888 "branch should show dash when None: {output}"
1889 );
1890 }
1891
1892 #[test]
1893 fn test_cli_parse_spawn_detach() {
1894 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1895 assert!(matches!(
1896 &cli.command,
1897 Some(Commands::Spawn { detach, .. }) if *detach
1898 ));
1899 }
1900
1901 #[test]
1902 fn test_cli_parse_spawn_detach_short() {
1903 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1904 assert!(matches!(
1905 &cli.command,
1906 Some(Commands::Spawn { detach, .. }) if *detach
1907 ));
1908 }
1909
1910 #[test]
1911 fn test_cli_parse_spawn_detach_default() {
1912 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1913 assert!(matches!(
1914 &cli.command,
1915 Some(Commands::Spawn { detach, .. }) if !detach
1916 ));
1917 }
1918
1919 #[test]
1920 fn test_cli_parse_logs() {
1921 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1922 assert!(matches!(
1923 &cli.command,
1924 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1925 ));
1926 }
1927
1928 #[test]
1929 fn test_cli_parse_logs_with_lines() {
1930 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1931 assert!(matches!(
1932 &cli.command,
1933 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1934 ));
1935 }
1936
1937 #[test]
1938 fn test_cli_parse_logs_follow() {
1939 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1940 assert!(matches!(
1941 &cli.command,
1942 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1943 ));
1944 }
1945
1946 #[test]
1947 fn test_cli_parse_logs_follow_short() {
1948 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1949 assert!(matches!(
1950 &cli.command,
1951 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1952 ));
1953 }
1954
1955 #[test]
1956 fn test_cli_parse_stop() {
1957 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
1958 assert!(matches!(
1959 &cli.command,
1960 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1961 ));
1962 }
1963
1964 #[test]
1965 fn test_cli_parse_stop_purge() {
1966 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
1967 assert!(matches!(
1968 &cli.command,
1969 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1970 ));
1971
1972 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
1973 assert!(matches!(
1974 &cli.command,
1975 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1976 ));
1977 }
1978
1979 #[test]
1980 fn test_cli_parse_kill_alias() {
1981 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1982 assert!(matches!(
1983 &cli.command,
1984 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1985 ));
1986 }
1987
1988 #[test]
1989 fn test_cli_parse_resume() {
1990 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1991 assert!(matches!(
1992 &cli.command,
1993 Some(Commands::Resume { name }) if name == "my-session"
1994 ));
1995 }
1996
1997 #[test]
1998 fn test_cli_parse_input() {
1999 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2000 assert!(matches!(
2001 &cli.command,
2002 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2003 ));
2004 }
2005
2006 #[test]
2007 fn test_cli_parse_input_no_text() {
2008 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2009 assert!(matches!(
2010 &cli.command,
2011 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2012 ));
2013 }
2014
2015 #[test]
2016 fn test_cli_parse_input_alias() {
2017 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2018 assert!(matches!(
2019 &cli.command,
2020 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2021 ));
2022 }
2023
2024 #[test]
2025 fn test_cli_parse_custom_node() {
2026 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2027 assert_eq!(cli.node, "win-pc:8080");
2028 assert_eq!(cli.path.as_deref(), Some("list"));
2030 }
2031
2032 #[test]
2033 fn test_cli_version() {
2034 let result = Cli::try_parse_from(["pulpo", "--version"]);
2035 let err = result.unwrap_err();
2037 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2038 }
2039
2040 #[test]
2041 fn test_cli_parse_no_subcommand_succeeds() {
2042 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2043 assert!(cli.command.is_none());
2044 assert!(cli.path.is_none());
2045 }
2046
2047 #[test]
2048 fn test_cli_debug() {
2049 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2050 let debug = format!("{cli:?}");
2051 assert!(debug.contains("List"));
2052 }
2053
2054 #[test]
2055 fn test_commands_debug() {
2056 let cmd = Commands::List { all: false };
2057 assert_eq!(format!("{cmd:?}"), "List { all: false }");
2058 }
2059
2060 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2062
2063 fn test_create_response_json() -> String {
2065 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2066 }
2067
2068 async fn start_test_server() -> String {
2070 use axum::http::StatusCode;
2071 use axum::{
2072 Json, Router,
2073 routing::{get, post},
2074 };
2075
2076 let create_json = test_create_response_json();
2077
2078 let app = Router::new()
2079 .route(
2080 "/api/v1/sessions",
2081 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2082 (StatusCode::CREATED, create_json.clone())
2083 }),
2084 )
2085 .route(
2086 "/api/v1/sessions/{id}",
2087 get(|| async { TEST_SESSION_JSON.to_owned() }),
2088 )
2089 .route(
2090 "/api/v1/sessions/{id}/stop",
2091 post(|| async { StatusCode::NO_CONTENT }),
2092 )
2093 .route(
2094 "/api/v1/sessions/{id}/output",
2095 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2096 )
2097 .route(
2098 "/api/v1/peers",
2099 get(|| async {
2100 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2101 }),
2102 )
2103 .route(
2104 "/api/v1/sessions/{id}/resume",
2105 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2106 )
2107 .route(
2108 "/api/v1/sessions/{id}/interventions",
2109 get(|| async { "[]".to_owned() }),
2110 )
2111 .route(
2112 "/api/v1/sessions/{id}/input",
2113 post(|| async { StatusCode::NO_CONTENT }),
2114 )
2115 .route(
2116 "/api/v1/schedules",
2117 get(|| async { Json::<Vec<()>>(vec![]) })
2118 .post(|| async { StatusCode::CREATED }),
2119 )
2120 .route(
2121 "/api/v1/schedules/{id}",
2122 axum::routing::put(|| async { StatusCode::OK })
2123 .delete(|| async { StatusCode::NO_CONTENT }),
2124 );
2125
2126 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2127 let addr = listener.local_addr().unwrap();
2128 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2129 format!("127.0.0.1:{}", addr.port())
2130 }
2131
2132 #[tokio::test]
2133 async fn test_execute_list_success() {
2134 let node = start_test_server().await;
2135 let cli = Cli {
2136 node,
2137 token: None,
2138 command: Some(Commands::List { all: false }),
2139 path: None,
2140 };
2141 let result = execute(&cli).await.unwrap();
2142 assert_eq!(result, "No sessions.");
2143 }
2144
2145 #[tokio::test]
2146 async fn test_execute_nodes_success() {
2147 let node = start_test_server().await;
2148 let cli = Cli {
2149 node,
2150 token: None,
2151 command: Some(Commands::Nodes),
2152 path: None,
2153 };
2154 let result = execute(&cli).await.unwrap();
2155 assert!(result.contains("test"));
2156 assert!(result.contains("(local)"));
2157 assert!(result.contains("NAME"));
2158 }
2159
2160 #[tokio::test]
2161 async fn test_execute_spawn_success() {
2162 let node = start_test_server().await;
2163 let cli = Cli {
2164 node,
2165 token: None,
2166 command: Some(Commands::Spawn {
2167 name: Some("test".into()),
2168 workdir: Some("/tmp/repo".into()),
2169 ink: None,
2170 description: None,
2171 detach: true,
2172 idle_threshold: None,
2173 auto: false,
2174 worktree: false,
2175 worktree_base: None,
2176 runtime: None,
2177 secret: vec![],
2178 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2179 }),
2180 path: None,
2181 };
2182 let result = execute(&cli).await.unwrap();
2183 assert!(result.contains("Created session"));
2184 assert!(result.contains("repo"));
2185 }
2186
2187 #[tokio::test]
2188 async fn test_execute_spawn_with_all_flags() {
2189 let node = start_test_server().await;
2190 let cli = Cli {
2191 node,
2192 token: None,
2193 command: Some(Commands::Spawn {
2194 name: Some("test".into()),
2195 workdir: Some("/tmp/repo".into()),
2196 ink: Some("coder".into()),
2197 description: Some("Fix the bug".into()),
2198 detach: true,
2199 idle_threshold: None,
2200 auto: false,
2201 worktree: false,
2202 worktree_base: None,
2203 runtime: None,
2204 secret: vec![],
2205 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2206 }),
2207 path: None,
2208 };
2209 let result = execute(&cli).await.unwrap();
2210 assert!(result.contains("Created session"));
2211 }
2212
2213 #[tokio::test]
2214 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2215 let node = start_test_server().await;
2216 let cli = Cli {
2217 node,
2218 token: None,
2219 command: Some(Commands::Spawn {
2220 name: Some("full-opts".into()),
2221 workdir: Some("/tmp/repo".into()),
2222 ink: Some("coder".into()),
2223 description: Some("Full options".into()),
2224 detach: true,
2225 idle_threshold: Some(120),
2226 auto: false,
2227 worktree: true,
2228 worktree_base: None,
2229 runtime: Some("docker".into()),
2230 secret: vec![],
2231 command: vec!["claude".into()],
2232 }),
2233 path: None,
2234 };
2235 let result = execute(&cli).await.unwrap();
2236 assert!(result.contains("Created session"));
2237 }
2238
2239 #[tokio::test]
2240 async fn test_execute_spawn_no_name_derives_from_workdir() {
2241 let node = start_test_server().await;
2242 let cli = Cli {
2243 node,
2244 token: None,
2245 command: Some(Commands::Spawn {
2246 name: None,
2247 workdir: Some("/tmp/my-project".into()),
2248 ink: None,
2249 description: None,
2250 detach: true,
2251 idle_threshold: None,
2252 auto: false,
2253 worktree: false,
2254 worktree_base: None,
2255 runtime: None,
2256 secret: vec![],
2257 command: vec!["echo".into(), "hello".into()],
2258 }),
2259 path: None,
2260 };
2261 let result = execute(&cli).await.unwrap();
2262 assert!(result.contains("Created session"));
2263 }
2264
2265 #[tokio::test]
2266 async fn test_execute_spawn_no_command() {
2267 let node = start_test_server().await;
2268 let cli = Cli {
2269 node,
2270 token: None,
2271 command: Some(Commands::Spawn {
2272 name: Some("test".into()),
2273 workdir: Some("/tmp/repo".into()),
2274 ink: None,
2275 description: None,
2276 detach: true,
2277 idle_threshold: None,
2278 auto: false,
2279 worktree: false,
2280 worktree_base: None,
2281 runtime: None,
2282 secret: vec![],
2283 command: vec![],
2284 }),
2285 path: None,
2286 };
2287 let result = execute(&cli).await.unwrap();
2288 assert!(result.contains("Created session"));
2289 }
2290
2291 #[tokio::test]
2292 async fn test_execute_spawn_with_name() {
2293 let node = start_test_server().await;
2294 let cli = Cli {
2295 node,
2296 token: None,
2297 command: Some(Commands::Spawn {
2298 name: Some("my-task".into()),
2299 workdir: Some("/tmp/repo".into()),
2300 ink: None,
2301 description: None,
2302 detach: true,
2303 idle_threshold: None,
2304 auto: false,
2305 worktree: false,
2306 worktree_base: None,
2307 runtime: None,
2308 secret: vec![],
2309 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2310 }),
2311 path: None,
2312 };
2313 let result = execute(&cli).await.unwrap();
2314 assert!(result.contains("Created session"));
2315 }
2316
2317 #[tokio::test]
2318 async fn test_execute_spawn_auto_attach() {
2319 let node = start_test_server().await;
2320 let cli = Cli {
2321 node,
2322 token: None,
2323 command: Some(Commands::Spawn {
2324 name: Some("test".into()),
2325 workdir: Some("/tmp/repo".into()),
2326 ink: None,
2327 description: None,
2328 detach: false,
2329 idle_threshold: None,
2330 auto: false,
2331 worktree: false,
2332 worktree_base: None,
2333 runtime: None,
2334 secret: vec![],
2335 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2336 }),
2337 path: None,
2338 };
2339 let result = execute(&cli).await.unwrap();
2340 assert!(result.contains("Detached from session"));
2342 }
2343
2344 #[tokio::test]
2345 async fn test_execute_stop_success() {
2346 let node = start_test_server().await;
2347 let cli = Cli {
2348 node,
2349 token: None,
2350 command: Some(Commands::Stop {
2351 names: vec!["test-session".into()],
2352 purge: false,
2353 }),
2354 path: None,
2355 };
2356 let result = execute(&cli).await.unwrap();
2357 assert!(result.contains("stopped"));
2358 assert!(!result.contains("purged"));
2359 }
2360
2361 #[tokio::test]
2362 async fn test_execute_stop_with_purge() {
2363 let node = start_test_server().await;
2364 let cli = Cli {
2365 node,
2366 token: None,
2367 command: Some(Commands::Stop {
2368 names: vec!["test-session".into()],
2369 purge: true,
2370 }),
2371 path: None,
2372 };
2373 let result = execute(&cli).await.unwrap();
2374 assert!(result.contains("stopped and purged"));
2375 }
2376
2377 #[tokio::test]
2378 async fn test_execute_logs_success() {
2379 let node = start_test_server().await;
2380 let cli = Cli {
2381 node,
2382 token: None,
2383 command: Some(Commands::Logs {
2384 name: "test-session".into(),
2385 lines: 50,
2386 follow: false,
2387 }),
2388 path: None,
2389 };
2390 let result = execute(&cli).await.unwrap();
2391 assert!(result.contains("test output"));
2392 }
2393
2394 #[tokio::test]
2395 async fn test_execute_list_connection_refused() {
2396 let cli = Cli {
2397 node: "localhost:1".into(),
2398 token: None,
2399 command: Some(Commands::List { all: false }),
2400 path: None,
2401 };
2402 let result = execute(&cli).await;
2403 let err = result.unwrap_err().to_string();
2404 assert!(
2405 err.contains("Could not connect to pulpod"),
2406 "Expected friendly error, got: {err}"
2407 );
2408 assert!(err.contains("localhost:1"));
2409 }
2410
2411 #[tokio::test]
2412 async fn test_execute_nodes_connection_refused() {
2413 let cli = Cli {
2414 node: "localhost:1".into(),
2415 token: None,
2416 command: Some(Commands::Nodes),
2417 path: None,
2418 };
2419 let result = execute(&cli).await;
2420 let err = result.unwrap_err().to_string();
2421 assert!(err.contains("Could not connect to pulpod"));
2422 }
2423
2424 #[tokio::test]
2425 async fn test_execute_stop_error_response() {
2426 use axum::{Router, http::StatusCode, routing::post};
2427
2428 let app = Router::new().route(
2429 "/api/v1/sessions/{id}/stop",
2430 post(|| async {
2431 (
2432 StatusCode::NOT_FOUND,
2433 "{\"error\":\"session not found: test-session\"}",
2434 )
2435 }),
2436 );
2437 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2438 let addr = listener.local_addr().unwrap();
2439 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2440 let node = format!("127.0.0.1:{}", addr.port());
2441
2442 let cli = Cli {
2443 node,
2444 token: None,
2445 command: Some(Commands::Stop {
2446 names: vec!["test-session".into()],
2447 purge: false,
2448 }),
2449 path: None,
2450 };
2451 let result = execute(&cli).await.unwrap();
2452 assert!(result.contains("Error stopping test-session"), "{result}");
2453 }
2454
2455 #[tokio::test]
2456 async fn test_execute_logs_error_response() {
2457 use axum::{Router, http::StatusCode, routing::get};
2458
2459 let app = Router::new().route(
2460 "/api/v1/sessions/{id}/output",
2461 get(|| async {
2462 (
2463 StatusCode::NOT_FOUND,
2464 "{\"error\":\"session not found: ghost\"}",
2465 )
2466 }),
2467 );
2468 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2469 let addr = listener.local_addr().unwrap();
2470 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2471 let node = format!("127.0.0.1:{}", addr.port());
2472
2473 let cli = Cli {
2474 node,
2475 token: None,
2476 command: Some(Commands::Logs {
2477 name: "ghost".into(),
2478 lines: 50,
2479 follow: false,
2480 }),
2481 path: None,
2482 };
2483 let err = execute(&cli).await.unwrap_err();
2484 assert_eq!(err.to_string(), "session not found: ghost");
2485 }
2486
2487 #[tokio::test]
2488 async fn test_execute_resume_error_response() {
2489 use axum::{Router, http::StatusCode, routing::post};
2490
2491 let app = Router::new().route(
2492 "/api/v1/sessions/{id}/resume",
2493 post(|| async {
2494 (
2495 StatusCode::BAD_REQUEST,
2496 "{\"error\":\"session is not lost (status: active)\"}",
2497 )
2498 }),
2499 );
2500 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2501 let addr = listener.local_addr().unwrap();
2502 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2503 let node = format!("127.0.0.1:{}", addr.port());
2504
2505 let cli = Cli {
2506 node,
2507 token: None,
2508 command: Some(Commands::Resume {
2509 name: "test-session".into(),
2510 }),
2511 path: None,
2512 };
2513 let err = execute(&cli).await.unwrap_err();
2514 assert_eq!(err.to_string(), "session is not lost (status: active)");
2515 }
2516
2517 #[tokio::test]
2518 async fn test_execute_spawn_error_response() {
2519 use axum::{Router, http::StatusCode, routing::post};
2520
2521 let app = Router::new().route(
2522 "/api/v1/sessions",
2523 post(|| async {
2524 (
2525 StatusCode::INTERNAL_SERVER_ERROR,
2526 "{\"error\":\"failed to spawn session\"}",
2527 )
2528 }),
2529 );
2530 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2531 let addr = listener.local_addr().unwrap();
2532 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2533 let node = format!("127.0.0.1:{}", addr.port());
2534
2535 let cli = Cli {
2536 node,
2537 token: None,
2538 command: Some(Commands::Spawn {
2539 name: Some("test".into()),
2540 workdir: Some("/tmp/repo".into()),
2541 ink: None,
2542 description: None,
2543 detach: true,
2544 idle_threshold: None,
2545 auto: false,
2546 worktree: false,
2547 worktree_base: None,
2548 runtime: None,
2549 secret: vec![],
2550 command: vec!["test".into()],
2551 }),
2552 path: None,
2553 };
2554 let err = execute(&cli).await.unwrap_err();
2555 assert_eq!(err.to_string(), "failed to spawn session");
2556 }
2557
2558 #[tokio::test]
2559 async fn test_execute_interventions_error_response() {
2560 use axum::{Router, http::StatusCode, routing::get};
2561
2562 let app = Router::new().route(
2563 "/api/v1/sessions/{id}/interventions",
2564 get(|| async {
2565 (
2566 StatusCode::NOT_FOUND,
2567 "{\"error\":\"session not found: ghost\"}",
2568 )
2569 }),
2570 );
2571 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2572 let addr = listener.local_addr().unwrap();
2573 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2574 let node = format!("127.0.0.1:{}", addr.port());
2575
2576 let cli = Cli {
2577 node,
2578 token: None,
2579 command: Some(Commands::Interventions {
2580 name: "ghost".into(),
2581 }),
2582 path: None,
2583 };
2584 let err = execute(&cli).await.unwrap_err();
2585 assert_eq!(err.to_string(), "session not found: ghost");
2586 }
2587
2588 #[tokio::test]
2589 async fn test_execute_resume_success() {
2590 let node = start_test_server().await;
2591 let cli = Cli {
2592 node,
2593 token: None,
2594 command: Some(Commands::Resume {
2595 name: "test-session".into(),
2596 }),
2597 path: None,
2598 };
2599 let result = execute(&cli).await.unwrap();
2600 assert!(result.contains("Detached from session"));
2601 }
2602
2603 #[tokio::test]
2604 async fn test_execute_input_success() {
2605 let node = start_test_server().await;
2606 let cli = Cli {
2607 node,
2608 token: None,
2609 command: Some(Commands::Input {
2610 name: "test-session".into(),
2611 text: Some("yes".into()),
2612 }),
2613 path: None,
2614 };
2615 let result = execute(&cli).await.unwrap();
2616 assert!(result.contains("Sent input to session test-session"));
2617 }
2618
2619 #[tokio::test]
2620 async fn test_execute_input_no_text() {
2621 let node = start_test_server().await;
2622 let cli = Cli {
2623 node,
2624 token: None,
2625 command: Some(Commands::Input {
2626 name: "test-session".into(),
2627 text: None,
2628 }),
2629 path: None,
2630 };
2631 let result = execute(&cli).await.unwrap();
2632 assert!(result.contains("Sent input to session test-session"));
2633 }
2634
2635 #[tokio::test]
2636 async fn test_execute_input_connection_refused() {
2637 let cli = Cli {
2638 node: "localhost:1".into(),
2639 token: None,
2640 command: Some(Commands::Input {
2641 name: "test".into(),
2642 text: Some("y".into()),
2643 }),
2644 path: None,
2645 };
2646 let result = execute(&cli).await;
2647 let err = result.unwrap_err().to_string();
2648 assert!(err.contains("Could not connect to pulpod"));
2649 }
2650
2651 #[tokio::test]
2652 async fn test_execute_input_error_response() {
2653 use axum::{Router, http::StatusCode, routing::post};
2654
2655 let app = Router::new().route(
2656 "/api/v1/sessions/{id}/input",
2657 post(|| async {
2658 (
2659 StatusCode::NOT_FOUND,
2660 "{\"error\":\"session not found: ghost\"}",
2661 )
2662 }),
2663 );
2664 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2665 let addr = listener.local_addr().unwrap();
2666 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2667 let node = format!("127.0.0.1:{}", addr.port());
2668
2669 let cli = Cli {
2670 node,
2671 token: None,
2672 command: Some(Commands::Input {
2673 name: "ghost".into(),
2674 text: Some("y".into()),
2675 }),
2676 path: None,
2677 };
2678 let err = execute(&cli).await.unwrap_err();
2679 assert_eq!(err.to_string(), "session not found: ghost");
2680 }
2681
2682 #[tokio::test]
2683 async fn test_execute_ui() {
2684 let cli = Cli {
2685 node: "localhost:7433".into(),
2686 token: None,
2687 command: Some(Commands::Ui),
2688 path: None,
2689 };
2690 let result = execute(&cli).await.unwrap();
2691 assert!(result.contains("Opening"));
2692 assert!(result.contains("http://localhost:7433"));
2693 }
2694
2695 #[tokio::test]
2696 async fn test_execute_ui_custom_node() {
2697 let cli = Cli {
2698 node: "mac-mini:7433".into(),
2699 token: None,
2700 command: Some(Commands::Ui),
2701 path: None,
2702 };
2703 let result = execute(&cli).await.unwrap();
2704 assert!(result.contains("http://mac-mini:7433"));
2705 }
2706
2707 #[test]
2708 fn test_format_sessions_empty() {
2709 assert_eq!(format_sessions(&[]), "No sessions.");
2710 }
2711
2712 #[test]
2713 fn test_format_sessions_with_data() {
2714 use chrono::Utc;
2715 use pulpo_common::session::SessionStatus;
2716 use uuid::Uuid;
2717
2718 let sessions = vec![Session {
2719 id: Uuid::nil(),
2720 name: "my-api".into(),
2721 workdir: "/tmp/repo".into(),
2722 command: "claude -p 'Fix the bug'".into(),
2723 description: Some("Fix the bug".into()),
2724 status: SessionStatus::Active,
2725 exit_code: None,
2726 backend_session_id: None,
2727 output_snapshot: None,
2728 metadata: None,
2729 ink: None,
2730 intervention_code: None,
2731 intervention_reason: None,
2732 intervention_at: None,
2733 last_output_at: None,
2734 idle_since: None,
2735 idle_threshold_secs: None,
2736 worktree_path: None,
2737 worktree_branch: None,
2738 runtime: Runtime::Tmux,
2739 created_at: Utc::now(),
2740 updated_at: Utc::now(),
2741 }];
2742 let output = format_sessions(&sessions);
2743 assert!(output.contains("ID"));
2744 assert!(output.contains("NAME"));
2745 assert!(output.contains("RUNTIME"));
2746 assert!(output.contains("COMMAND"));
2747 assert!(output.contains("00000000"));
2748 assert!(output.contains("my-api"));
2749 assert!(output.contains("active"));
2750 assert!(output.contains("tmux"));
2751 assert!(output.contains("claude -p 'Fix the bug'"));
2752 }
2753
2754 #[test]
2755 fn test_format_sessions_docker_runtime() {
2756 use chrono::Utc;
2757 use pulpo_common::session::SessionStatus;
2758 use uuid::Uuid;
2759
2760 let sessions = vec![Session {
2761 id: Uuid::nil(),
2762 name: "sandbox-test".into(),
2763 workdir: "/tmp".into(),
2764 command: "claude".into(),
2765 description: None,
2766 status: SessionStatus::Active,
2767 exit_code: None,
2768 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2769 output_snapshot: None,
2770 metadata: None,
2771 ink: None,
2772 intervention_code: None,
2773 intervention_reason: None,
2774 intervention_at: None,
2775 last_output_at: None,
2776 idle_since: None,
2777 idle_threshold_secs: None,
2778 worktree_path: None,
2779 worktree_branch: None,
2780 runtime: Runtime::Docker,
2781 created_at: Utc::now(),
2782 updated_at: Utc::now(),
2783 }];
2784 let output = format_sessions(&sessions);
2785 assert!(output.contains("docker"));
2786 }
2787
2788 #[test]
2789 fn test_format_sessions_long_command_truncated() {
2790 use chrono::Utc;
2791 use pulpo_common::session::SessionStatus;
2792 use uuid::Uuid;
2793
2794 let sessions = vec![Session {
2795 id: Uuid::nil(),
2796 name: "test".into(),
2797 workdir: "/tmp".into(),
2798 command:
2799 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2800 .into(),
2801 description: None,
2802 status: SessionStatus::Ready,
2803 exit_code: None,
2804 backend_session_id: None,
2805 output_snapshot: None,
2806 metadata: None,
2807 ink: None,
2808 intervention_code: None,
2809 intervention_reason: None,
2810 intervention_at: None,
2811 last_output_at: None,
2812 idle_since: None,
2813 idle_threshold_secs: None,
2814 worktree_path: None,
2815 worktree_branch: None,
2816 runtime: Runtime::Tmux,
2817 created_at: Utc::now(),
2818 updated_at: Utc::now(),
2819 }];
2820 let output = format_sessions(&sessions);
2821 assert!(output.contains("..."));
2822 }
2823
2824 #[test]
2825 fn test_format_sessions_worktree_indicator() {
2826 use chrono::Utc;
2827 use pulpo_common::session::SessionStatus;
2828 use uuid::Uuid;
2829
2830 let sessions = vec![Session {
2831 id: Uuid::nil(),
2832 name: "wt-task".into(),
2833 workdir: "/repo".into(),
2834 command: "claude".into(),
2835 description: None,
2836 status: SessionStatus::Active,
2837 exit_code: None,
2838 backend_session_id: None,
2839 output_snapshot: None,
2840 metadata: None,
2841 ink: None,
2842 intervention_code: None,
2843 intervention_reason: None,
2844 intervention_at: None,
2845 last_output_at: None,
2846 idle_since: None,
2847 idle_threshold_secs: None,
2848 worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
2849 worktree_branch: Some("wt-task".into()),
2850 runtime: Runtime::Tmux,
2851 created_at: Utc::now(),
2852 updated_at: Utc::now(),
2853 }];
2854 let output = format_sessions(&sessions);
2855 assert!(
2856 output.contains("[wt]"),
2857 "should show worktree indicator: {output}"
2858 );
2859 assert!(output.contains("wt-task [wt]"));
2860 }
2861
2862 #[test]
2863 fn test_format_sessions_pr_indicator() {
2864 use chrono::Utc;
2865 use pulpo_common::session::SessionStatus;
2866 use std::collections::HashMap;
2867 use uuid::Uuid;
2868
2869 let mut meta = HashMap::new();
2870 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2871 let sessions = vec![Session {
2872 id: Uuid::nil(),
2873 name: "pr-task".into(),
2874 workdir: "/tmp".into(),
2875 command: "claude".into(),
2876 description: None,
2877 status: SessionStatus::Active,
2878 exit_code: None,
2879 backend_session_id: None,
2880 output_snapshot: None,
2881 metadata: Some(meta),
2882 ink: None,
2883 intervention_code: None,
2884 intervention_reason: None,
2885 intervention_at: None,
2886 last_output_at: None,
2887 idle_since: None,
2888 idle_threshold_secs: None,
2889 worktree_path: None,
2890 worktree_branch: None,
2891 runtime: Runtime::Tmux,
2892 created_at: Utc::now(),
2893 updated_at: Utc::now(),
2894 }];
2895 let output = format_sessions(&sessions);
2896 assert!(
2897 output.contains("[PR]"),
2898 "should show PR indicator: {output}"
2899 );
2900 assert!(output.contains("pr-task [PR]"));
2901 }
2902
2903 #[test]
2904 fn test_format_sessions_worktree_and_pr_indicator() {
2905 use chrono::Utc;
2906 use pulpo_common::session::SessionStatus;
2907 use std::collections::HashMap;
2908 use uuid::Uuid;
2909
2910 let mut meta = HashMap::new();
2911 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2912 let sessions = vec![Session {
2913 id: Uuid::nil(),
2914 name: "both-task".into(),
2915 workdir: "/tmp".into(),
2916 command: "claude".into(),
2917 description: None,
2918 status: SessionStatus::Active,
2919 exit_code: None,
2920 backend_session_id: None,
2921 output_snapshot: None,
2922 metadata: Some(meta),
2923 ink: None,
2924 intervention_code: None,
2925 intervention_reason: None,
2926 intervention_at: None,
2927 last_output_at: None,
2928 idle_since: None,
2929 idle_threshold_secs: None,
2930 worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
2931 worktree_branch: Some("both-task".into()),
2932 runtime: Runtime::Tmux,
2933 created_at: Utc::now(),
2934 updated_at: Utc::now(),
2935 }];
2936 let output = format_sessions(&sessions);
2937 assert!(
2938 output.contains("[wt] [PR]"),
2939 "should show both indicators: {output}"
2940 );
2941 }
2942
2943 #[test]
2944 fn test_format_sessions_no_pr_without_metadata() {
2945 use chrono::Utc;
2946 use pulpo_common::session::SessionStatus;
2947 use uuid::Uuid;
2948
2949 let sessions = vec![Session {
2950 id: Uuid::nil(),
2951 name: "no-pr".into(),
2952 workdir: "/tmp".into(),
2953 command: "claude".into(),
2954 description: None,
2955 status: SessionStatus::Active,
2956 exit_code: None,
2957 backend_session_id: None,
2958 output_snapshot: None,
2959 metadata: None,
2960 ink: None,
2961 intervention_code: None,
2962 intervention_reason: None,
2963 intervention_at: None,
2964 last_output_at: None,
2965 idle_since: None,
2966 idle_threshold_secs: None,
2967 worktree_path: None,
2968 worktree_branch: None,
2969 runtime: Runtime::Tmux,
2970 created_at: Utc::now(),
2971 updated_at: Utc::now(),
2972 }];
2973 let output = format_sessions(&sessions);
2974 assert!(
2975 !output.contains("[PR]"),
2976 "should not show PR indicator: {output}"
2977 );
2978 }
2979
2980 #[test]
2981 fn test_format_nodes() {
2982 use pulpo_common::node::NodeInfo;
2983 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2984
2985 let resp = PeersResponse {
2986 local: NodeInfo {
2987 name: "mac-mini".into(),
2988 hostname: "h".into(),
2989 os: "macos".into(),
2990 arch: "arm64".into(),
2991 cpus: 8,
2992 memory_mb: 16384,
2993 gpu: None,
2994 },
2995 peers: vec![PeerInfo {
2996 name: "win-pc".into(),
2997 address: "win-pc:7433".into(),
2998 status: PeerStatus::Online,
2999 node_info: None,
3000 session_count: Some(3),
3001 source: PeerSource::Configured,
3002 }],
3003 };
3004 let output = format_nodes(&resp);
3005 assert!(output.contains("mac-mini"));
3006 assert!(output.contains("(local)"));
3007 assert!(output.contains("win-pc"));
3008 assert!(output.contains('3'));
3009 }
3010
3011 #[test]
3012 fn test_format_nodes_no_session_count() {
3013 use pulpo_common::node::NodeInfo;
3014 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3015
3016 let resp = PeersResponse {
3017 local: NodeInfo {
3018 name: "local".into(),
3019 hostname: "h".into(),
3020 os: "linux".into(),
3021 arch: "x86_64".into(),
3022 cpus: 4,
3023 memory_mb: 8192,
3024 gpu: None,
3025 },
3026 peers: vec![PeerInfo {
3027 name: "peer".into(),
3028 address: "peer:7433".into(),
3029 status: PeerStatus::Offline,
3030 node_info: None,
3031 session_count: None,
3032 source: PeerSource::Configured,
3033 }],
3034 };
3035 let output = format_nodes(&resp);
3036 assert!(output.contains("offline"));
3037 let lines: Vec<&str> = output.lines().collect();
3039 assert!(lines[2].contains('-'));
3040 }
3041
3042 #[tokio::test]
3043 async fn test_execute_resume_connection_refused() {
3044 let cli = Cli {
3045 node: "localhost:1".into(),
3046 token: None,
3047 command: Some(Commands::Resume {
3048 name: "test".into(),
3049 }),
3050 path: None,
3051 };
3052 let result = execute(&cli).await;
3053 let err = result.unwrap_err().to_string();
3054 assert!(err.contains("Could not connect to pulpod"));
3055 }
3056
3057 #[tokio::test]
3058 async fn test_execute_spawn_connection_refused() {
3059 let cli = Cli {
3060 node: "localhost:1".into(),
3061 token: None,
3062 command: Some(Commands::Spawn {
3063 name: Some("test".into()),
3064 workdir: Some("/tmp".into()),
3065 ink: None,
3066 description: None,
3067 detach: true,
3068 idle_threshold: None,
3069 auto: false,
3070 worktree: false,
3071 worktree_base: None,
3072 runtime: None,
3073 secret: vec![],
3074 command: vec!["test".into()],
3075 }),
3076 path: None,
3077 };
3078 let result = execute(&cli).await;
3079 let err = result.unwrap_err().to_string();
3080 assert!(err.contains("Could not connect to pulpod"));
3081 }
3082
3083 #[tokio::test]
3084 async fn test_execute_stop_connection_refused() {
3085 let cli = Cli {
3086 node: "localhost:1".into(),
3087 token: None,
3088 command: Some(Commands::Stop {
3089 names: vec!["test".into()],
3090 purge: false,
3091 }),
3092 path: None,
3093 };
3094 let result = execute(&cli).await;
3095 let err = result.unwrap_err().to_string();
3096 assert!(err.contains("Could not connect to pulpod"));
3097 }
3098
3099 #[tokio::test]
3100 async fn test_execute_logs_connection_refused() {
3101 let cli = Cli {
3102 node: "localhost:1".into(),
3103 token: None,
3104 command: Some(Commands::Logs {
3105 name: "test".into(),
3106 lines: 50,
3107 follow: false,
3108 }),
3109 path: None,
3110 };
3111 let result = execute(&cli).await;
3112 let err = result.unwrap_err().to_string();
3113 assert!(err.contains("Could not connect to pulpod"));
3114 }
3115
3116 #[tokio::test]
3117 async fn test_friendly_error_connect() {
3118 let err = reqwest::Client::new()
3120 .get("http://127.0.0.1:1")
3121 .send()
3122 .await
3123 .unwrap_err();
3124 let friendly = friendly_error(&err, "test-node:1");
3125 let msg = friendly.to_string();
3126 assert!(
3127 msg.contains("Could not connect"),
3128 "Expected connect message, got: {msg}"
3129 );
3130 }
3131
3132 #[tokio::test]
3133 async fn test_friendly_error_other() {
3134 let err = reqwest::Client::new()
3136 .get("http://[::invalid::url")
3137 .send()
3138 .await
3139 .unwrap_err();
3140 let friendly = friendly_error(&err, "bad-host");
3141 let msg = friendly.to_string();
3142 assert!(
3143 msg.contains("Network error"),
3144 "Expected network error message, got: {msg}"
3145 );
3146 assert!(msg.contains("bad-host"));
3147 }
3148
3149 #[test]
3152 fn test_is_localhost_variants() {
3153 assert!(is_localhost("localhost:7433"));
3154 assert!(is_localhost("127.0.0.1:7433"));
3155 assert!(is_localhost("[::1]:7433"));
3156 assert!(is_localhost("::1"));
3157 assert!(is_localhost("localhost"));
3158 assert!(!is_localhost("mac-mini:7433"));
3159 assert!(!is_localhost("192.168.1.100:7433"));
3160 }
3161
3162 #[test]
3163 fn test_authed_get_with_token() {
3164 let client = reqwest::Client::new();
3165 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3166 .build()
3167 .unwrap();
3168 let auth = req
3169 .headers()
3170 .get("authorization")
3171 .unwrap()
3172 .to_str()
3173 .unwrap();
3174 assert_eq!(auth, "Bearer tok");
3175 }
3176
3177 #[test]
3178 fn test_authed_get_without_token() {
3179 let client = reqwest::Client::new();
3180 let req = authed_get(&client, "http://h:1/api".into(), None)
3181 .build()
3182 .unwrap();
3183 assert!(req.headers().get("authorization").is_none());
3184 }
3185
3186 #[test]
3187 fn test_authed_post_with_token() {
3188 let client = reqwest::Client::new();
3189 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3190 .build()
3191 .unwrap();
3192 let auth = req
3193 .headers()
3194 .get("authorization")
3195 .unwrap()
3196 .to_str()
3197 .unwrap();
3198 assert_eq!(auth, "Bearer secret");
3199 }
3200
3201 #[test]
3202 fn test_authed_post_without_token() {
3203 let client = reqwest::Client::new();
3204 let req = authed_post(&client, "http://h:1/api".into(), None)
3205 .build()
3206 .unwrap();
3207 assert!(req.headers().get("authorization").is_none());
3208 }
3209
3210 #[test]
3211 fn test_authed_delete_with_token() {
3212 let client = reqwest::Client::new();
3213 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3214 .build()
3215 .unwrap();
3216 let auth = req
3217 .headers()
3218 .get("authorization")
3219 .unwrap()
3220 .to_str()
3221 .unwrap();
3222 assert_eq!(auth, "Bearer del-tok");
3223 }
3224
3225 #[test]
3226 fn test_authed_delete_without_token() {
3227 let client = reqwest::Client::new();
3228 let req = authed_delete(&client, "http://h:1/api".into(), None)
3229 .build()
3230 .unwrap();
3231 assert!(req.headers().get("authorization").is_none());
3232 }
3233
3234 #[tokio::test]
3235 async fn test_resolve_token_explicit() {
3236 let client = reqwest::Client::new();
3237 let token =
3238 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3239 assert_eq!(token, Some("my-tok".into()));
3240 }
3241
3242 #[tokio::test]
3243 async fn test_resolve_token_remote_no_explicit() {
3244 let client = reqwest::Client::new();
3245 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3246 assert_eq!(token, None);
3247 }
3248
3249 #[tokio::test]
3250 async fn test_resolve_token_localhost_auto_discover() {
3251 use axum::{Json, Router, routing::get};
3252
3253 let app = Router::new().route(
3254 "/api/v1/auth/token",
3255 get(|| async {
3256 Json(AuthTokenResponse {
3257 token: "discovered".into(),
3258 })
3259 }),
3260 );
3261 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3262 let addr = listener.local_addr().unwrap();
3263 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3264
3265 let node = format!("localhost:{}", addr.port());
3266 let base = base_url(&node);
3267 let client = reqwest::Client::new();
3268 let token = resolve_token(&client, &base, &node, None).await;
3269 assert_eq!(token, Some("discovered".into()));
3270 }
3271
3272 #[tokio::test]
3273 async fn test_discover_token_empty_returns_none() {
3274 use axum::{Json, Router, routing::get};
3275
3276 let app = Router::new().route(
3277 "/api/v1/auth/token",
3278 get(|| async {
3279 Json(AuthTokenResponse {
3280 token: String::new(),
3281 })
3282 }),
3283 );
3284 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3285 let addr = listener.local_addr().unwrap();
3286 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3287
3288 let base = format!("http://127.0.0.1:{}", addr.port());
3289 let client = reqwest::Client::new();
3290 assert_eq!(discover_token(&client, &base).await, None);
3291 }
3292
3293 #[tokio::test]
3294 async fn test_discover_token_unreachable_returns_none() {
3295 let client = reqwest::Client::new();
3296 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3297 }
3298
3299 #[test]
3300 fn test_cli_parse_with_token() {
3301 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3302 assert_eq!(cli.token, Some("my-secret".into()));
3303 }
3304
3305 #[test]
3306 fn test_cli_parse_without_token() {
3307 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3308 assert_eq!(cli.token, None);
3309 }
3310
3311 #[tokio::test]
3312 async fn test_execute_with_explicit_token_sends_header() {
3313 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3314
3315 let app = Router::new().route(
3316 "/api/v1/sessions",
3317 get(|req: Request| async move {
3318 let auth = req
3319 .headers()
3320 .get("authorization")
3321 .and_then(|v| v.to_str().ok())
3322 .unwrap_or("");
3323 assert_eq!(auth, "Bearer test-token");
3324 (StatusCode::OK, "[]".to_owned())
3325 }),
3326 );
3327 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3328 let addr = listener.local_addr().unwrap();
3329 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3330 let node = format!("127.0.0.1:{}", addr.port());
3331
3332 let cli = Cli {
3333 node,
3334 token: Some("test-token".into()),
3335 command: Some(Commands::List { all: false }),
3336 path: None,
3337 };
3338 let result = execute(&cli).await.unwrap();
3339 assert_eq!(result, "No sessions.");
3340 }
3341
3342 #[test]
3345 fn test_cli_parse_interventions() {
3346 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3347 assert!(matches!(
3348 &cli.command,
3349 Some(Commands::Interventions { name }) if name == "my-session"
3350 ));
3351 }
3352
3353 #[test]
3354 fn test_format_interventions_empty() {
3355 assert_eq!(format_interventions(&[]), "No intervention events.");
3356 }
3357
3358 #[test]
3359 fn test_format_interventions_with_data() {
3360 let events = vec![
3361 InterventionEventResponse {
3362 id: 1,
3363 session_id: "sess-1".into(),
3364 code: None,
3365 reason: "Memory exceeded threshold".into(),
3366 created_at: "2026-01-01T00:00:00Z".into(),
3367 },
3368 InterventionEventResponse {
3369 id: 2,
3370 session_id: "sess-1".into(),
3371 code: None,
3372 reason: "Idle for 10 minutes".into(),
3373 created_at: "2026-01-02T00:00:00Z".into(),
3374 },
3375 ];
3376 let output = format_interventions(&events);
3377 assert!(output.contains("ID"));
3378 assert!(output.contains("TIMESTAMP"));
3379 assert!(output.contains("REASON"));
3380 assert!(output.contains("Memory exceeded threshold"));
3381 assert!(output.contains("Idle for 10 minutes"));
3382 assert!(output.contains("2026-01-01T00:00:00Z"));
3383 }
3384
3385 #[tokio::test]
3386 async fn test_execute_interventions_empty() {
3387 let node = start_test_server().await;
3388 let cli = Cli {
3389 node,
3390 token: None,
3391 command: Some(Commands::Interventions {
3392 name: "my-session".into(),
3393 }),
3394 path: None,
3395 };
3396 let result = execute(&cli).await.unwrap();
3397 assert_eq!(result, "No intervention events.");
3398 }
3399
3400 #[tokio::test]
3401 async fn test_execute_interventions_with_data() {
3402 use axum::{Router, routing::get};
3403
3404 let app = Router::new().route(
3405 "/api/v1/sessions/{id}/interventions",
3406 get(|| async {
3407 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3408 .to_owned()
3409 }),
3410 );
3411 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3412 let addr = listener.local_addr().unwrap();
3413 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3414 let node = format!("127.0.0.1:{}", addr.port());
3415
3416 let cli = Cli {
3417 node,
3418 token: None,
3419 command: Some(Commands::Interventions {
3420 name: "test".into(),
3421 }),
3422 path: None,
3423 };
3424 let result = execute(&cli).await.unwrap();
3425 assert!(result.contains("OOM"));
3426 assert!(result.contains("2026-01-01T00:00:00Z"));
3427 }
3428
3429 #[tokio::test]
3430 async fn test_execute_interventions_connection_refused() {
3431 let cli = Cli {
3432 node: "localhost:1".into(),
3433 token: None,
3434 command: Some(Commands::Interventions {
3435 name: "test".into(),
3436 }),
3437 path: None,
3438 };
3439 let result = execute(&cli).await;
3440 let err = result.unwrap_err().to_string();
3441 assert!(err.contains("Could not connect to pulpod"));
3442 }
3443
3444 #[test]
3447 fn test_build_attach_command_tmux() {
3448 let cmd = build_attach_command("my-session");
3449 assert_eq!(cmd.get_program(), "tmux");
3450 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3451 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3452 }
3453
3454 #[test]
3455 fn test_build_attach_command_docker() {
3456 let cmd = build_attach_command("docker:pulpo-my-task");
3457 assert_eq!(cmd.get_program(), "docker");
3458 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3459 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3460 }
3461
3462 #[test]
3463 fn test_cli_parse_attach() {
3464 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3465 assert!(matches!(
3466 &cli.command,
3467 Some(Commands::Attach { name }) if name == "my-session"
3468 ));
3469 }
3470
3471 #[test]
3472 fn test_cli_parse_attach_alias() {
3473 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3474 assert!(matches!(
3475 &cli.command,
3476 Some(Commands::Attach { name }) if name == "my-session"
3477 ));
3478 }
3479
3480 #[tokio::test]
3481 async fn test_execute_attach_success() {
3482 let node = start_test_server().await;
3483 let cli = Cli {
3484 node,
3485 token: None,
3486 command: Some(Commands::Attach {
3487 name: "test-session".into(),
3488 }),
3489 path: None,
3490 };
3491 let result = execute(&cli).await.unwrap();
3492 assert!(result.contains("Detached from session test-session"));
3493 }
3494
3495 #[tokio::test]
3496 async fn test_execute_attach_with_backend_session_id() {
3497 use axum::{Router, routing::get};
3498 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3499 let app = Router::new().route(
3500 "/api/v1/sessions/{id}",
3501 get(move || async move { session_json.to_owned() }),
3502 );
3503 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3504 let addr = listener.local_addr().unwrap();
3505 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3506
3507 let cli = Cli {
3508 node: format!("127.0.0.1:{}", addr.port()),
3509 token: None,
3510 command: Some(Commands::Attach {
3511 name: "my-session".into(),
3512 }),
3513 path: None,
3514 };
3515 let result = execute(&cli).await.unwrap();
3516 assert!(result.contains("Detached from session my-session"));
3517 }
3518
3519 #[tokio::test]
3520 async fn test_execute_attach_connection_refused() {
3521 let cli = Cli {
3522 node: "localhost:1".into(),
3523 token: None,
3524 command: Some(Commands::Attach {
3525 name: "test-session".into(),
3526 }),
3527 path: None,
3528 };
3529 let result = execute(&cli).await;
3530 let err = result.unwrap_err().to_string();
3531 assert!(err.contains("Could not connect to pulpod"));
3532 }
3533
3534 #[tokio::test]
3535 async fn test_execute_attach_error_response() {
3536 use axum::{Router, http::StatusCode, routing::get};
3537 let app = Router::new().route(
3538 "/api/v1/sessions/{id}",
3539 get(|| async {
3540 (
3541 StatusCode::NOT_FOUND,
3542 r#"{"error":"session not found"}"#.to_owned(),
3543 )
3544 }),
3545 );
3546 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3547 let addr = listener.local_addr().unwrap();
3548 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3549
3550 let cli = Cli {
3551 node: format!("127.0.0.1:{}", addr.port()),
3552 token: None,
3553 command: Some(Commands::Attach {
3554 name: "nonexistent".into(),
3555 }),
3556 path: None,
3557 };
3558 let result = execute(&cli).await;
3559 let err = result.unwrap_err().to_string();
3560 assert!(err.contains("session not found"));
3561 }
3562
3563 #[tokio::test]
3564 async fn test_execute_attach_stale_session() {
3565 use axum::{Router, routing::get};
3566 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3567 let app = Router::new().route(
3568 "/api/v1/sessions/{id}",
3569 get(move || async move { session_json.to_owned() }),
3570 );
3571 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3572 let addr = listener.local_addr().unwrap();
3573 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3574
3575 let cli = Cli {
3576 node: format!("127.0.0.1:{}", addr.port()),
3577 token: None,
3578 command: Some(Commands::Attach {
3579 name: "stale-sess".into(),
3580 }),
3581 path: None,
3582 };
3583 let result = execute(&cli).await;
3584 let err = result.unwrap_err().to_string();
3585 assert!(err.contains("lost"));
3586 assert!(err.contains("pulpo resume"));
3587 }
3588
3589 #[tokio::test]
3590 async fn test_execute_attach_dead_session() {
3591 use axum::{Router, routing::get};
3592 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3593 let app = Router::new().route(
3594 "/api/v1/sessions/{id}",
3595 get(move || async move { session_json.to_owned() }),
3596 );
3597 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3598 let addr = listener.local_addr().unwrap();
3599 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3600
3601 let cli = Cli {
3602 node: format!("127.0.0.1:{}", addr.port()),
3603 token: None,
3604 command: Some(Commands::Attach {
3605 name: "dead-sess".into(),
3606 }),
3607 path: None,
3608 };
3609 let result = execute(&cli).await;
3610 let err = result.unwrap_err().to_string();
3611 assert!(err.contains("stopped"));
3612 assert!(err.contains("cannot attach"));
3613 }
3614
3615 #[test]
3618 fn test_cli_parse_alias_spawn() {
3619 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3620 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3621 }
3622
3623 #[test]
3624 fn test_cli_parse_alias_list() {
3625 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3626 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3627 }
3628
3629 #[test]
3630 fn test_cli_parse_list_all() {
3631 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3632 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3633
3634 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3635 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3636 }
3637
3638 #[test]
3639 fn test_cli_parse_alias_logs() {
3640 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3641 assert!(matches!(
3642 &cli.command,
3643 Some(Commands::Logs { name, .. }) if name == "my-session"
3644 ));
3645 }
3646
3647 #[test]
3648 fn test_cli_parse_alias_stop() {
3649 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3650 assert!(matches!(
3651 &cli.command,
3652 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
3653 ));
3654 }
3655
3656 #[test]
3657 fn test_cli_parse_alias_resume() {
3658 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3659 assert!(matches!(
3660 &cli.command,
3661 Some(Commands::Resume { name }) if name == "my-session"
3662 ));
3663 }
3664
3665 #[test]
3666 fn test_cli_parse_alias_nodes() {
3667 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3668 assert!(matches!(&cli.command, Some(Commands::Nodes)));
3669 }
3670
3671 #[test]
3672 fn test_cli_parse_alias_interventions() {
3673 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3674 assert!(matches!(
3675 &cli.command,
3676 Some(Commands::Interventions { name }) if name == "my-session"
3677 ));
3678 }
3679
3680 #[test]
3681 fn test_api_error_json() {
3682 let err = api_error("{\"error\":\"session not found: foo\"}");
3683 assert_eq!(err.to_string(), "session not found: foo");
3684 }
3685
3686 #[test]
3687 fn test_api_error_plain_text() {
3688 let err = api_error("plain text error");
3689 assert_eq!(err.to_string(), "plain text error");
3690 }
3691
3692 #[test]
3695 fn test_diff_output_empty_prev() {
3696 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3697 }
3698
3699 #[test]
3700 fn test_diff_output_identical() {
3701 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3702 }
3703
3704 #[test]
3705 fn test_diff_output_new_lines_appended() {
3706 let prev = "line1\nline2";
3707 let new = "line1\nline2\nline3\nline4";
3708 assert_eq!(diff_output(prev, new), "line3\nline4");
3709 }
3710
3711 #[test]
3712 fn test_diff_output_scrolled_window() {
3713 let prev = "line1\nline2\nline3";
3715 let new = "line2\nline3\nline4";
3716 assert_eq!(diff_output(prev, new), "line4");
3717 }
3718
3719 #[test]
3720 fn test_diff_output_completely_different() {
3721 let prev = "aaa\nbbb";
3722 let new = "xxx\nyyy";
3723 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3724 }
3725
3726 #[test]
3727 fn test_diff_output_last_line_matches_but_overlap_fails() {
3728 let prev = "aaa\ncommon";
3730 let new = "zzz\ncommon\nnew_line";
3731 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3735 }
3736
3737 #[test]
3738 fn test_diff_output_new_empty() {
3739 assert_eq!(diff_output("line1", ""), "");
3740 }
3741
3742 async fn start_follow_test_server() -> String {
3747 use axum::{Router, extract::Path, extract::Query, routing::get};
3748 use std::sync::Arc;
3749 use std::sync::atomic::{AtomicUsize, Ordering};
3750
3751 let call_count = Arc::new(AtomicUsize::new(0));
3752 let output_count = call_count.clone();
3753
3754 let app = Router::new()
3755 .route(
3756 "/api/v1/sessions/{id}/output",
3757 get(
3758 move |_path: Path<String>,
3759 _query: Query<std::collections::HashMap<String, String>>| {
3760 let count = output_count.clone();
3761 async move {
3762 let n = count.fetch_add(1, Ordering::SeqCst);
3763 let output = match n {
3764 0 => "line1\nline2".to_owned(),
3765 1 => "line1\nline2\nline3".to_owned(),
3766 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3767 };
3768 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3769 }
3770 },
3771 ),
3772 )
3773 .route(
3774 "/api/v1/sessions/{id}",
3775 get(|_path: Path<String>| async {
3776 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3777 }),
3778 );
3779
3780 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3781 let addr = listener.local_addr().unwrap();
3782 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3783 format!("http://127.0.0.1:{}", addr.port())
3784 }
3785
3786 #[tokio::test]
3787 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3788 let base = start_follow_test_server().await;
3789 let client = reqwest::Client::new();
3790 let mut buf = Vec::new();
3791
3792 follow_logs(&client, &base, "test", 100, None, &mut buf)
3793 .await
3794 .unwrap();
3795
3796 let output = String::from_utf8(buf).unwrap();
3797 assert!(output.contains("line1"));
3799 assert!(output.contains("line2"));
3800 assert!(output.contains("line3"));
3801 assert!(output.contains("line4"));
3802 assert!(output.contains("[pulpo] Agent exited"));
3803 }
3804
3805 #[tokio::test]
3806 async fn test_execute_logs_follow_success() {
3807 let base = start_follow_test_server().await;
3808 let node = base.strip_prefix("http://").unwrap().to_owned();
3810
3811 let cli = Cli {
3812 node,
3813 token: None,
3814 command: Some(Commands::Logs {
3815 name: "test".into(),
3816 lines: 100,
3817 follow: true,
3818 }),
3819 path: None,
3820 };
3821 let result = execute(&cli).await.unwrap();
3823 assert_eq!(result, "");
3824 }
3825
3826 #[tokio::test]
3827 async fn test_execute_logs_follow_connection_refused() {
3828 let cli = Cli {
3829 node: "localhost:1".into(),
3830 token: None,
3831 command: Some(Commands::Logs {
3832 name: "test".into(),
3833 lines: 50,
3834 follow: true,
3835 }),
3836 path: None,
3837 };
3838 let result = execute(&cli).await;
3839 let err = result.unwrap_err().to_string();
3840 assert!(
3841 err.contains("Could not connect to pulpod"),
3842 "Expected friendly error, got: {err}"
3843 );
3844 }
3845
3846 #[tokio::test]
3847 async fn test_follow_logs_exits_on_dead() {
3848 use axum::{Router, extract::Path, extract::Query, routing::get};
3849
3850 let app = Router::new()
3851 .route(
3852 "/api/v1/sessions/{id}/output",
3853 get(
3854 |_path: Path<String>,
3855 _query: Query<std::collections::HashMap<String, String>>| async {
3856 r#"{"output":"some output"}"#.to_owned()
3857 },
3858 ),
3859 )
3860 .route(
3861 "/api/v1/sessions/{id}",
3862 get(|_path: Path<String>| async {
3863 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3864 }),
3865 );
3866
3867 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3868 let addr = listener.local_addr().unwrap();
3869 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3870 let base = format!("http://127.0.0.1:{}", addr.port());
3871
3872 let client = reqwest::Client::new();
3873 let mut buf = Vec::new();
3874 follow_logs(&client, &base, "test", 100, None, &mut buf)
3875 .await
3876 .unwrap();
3877
3878 let output = String::from_utf8(buf).unwrap();
3879 assert!(output.contains("some output"));
3880 }
3881
3882 #[tokio::test]
3883 async fn test_follow_logs_exits_on_stale() {
3884 use axum::{Router, extract::Path, extract::Query, routing::get};
3885
3886 let app = Router::new()
3887 .route(
3888 "/api/v1/sessions/{id}/output",
3889 get(
3890 |_path: Path<String>,
3891 _query: Query<std::collections::HashMap<String, String>>| async {
3892 r#"{"output":"stale output"}"#.to_owned()
3893 },
3894 ),
3895 )
3896 .route(
3897 "/api/v1/sessions/{id}",
3898 get(|_path: Path<String>| async {
3899 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3900 }),
3901 );
3902
3903 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3904 let addr = listener.local_addr().unwrap();
3905 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3906 let base = format!("http://127.0.0.1:{}", addr.port());
3907
3908 let client = reqwest::Client::new();
3909 let mut buf = Vec::new();
3910 follow_logs(&client, &base, "test", 100, None, &mut buf)
3911 .await
3912 .unwrap();
3913
3914 let output = String::from_utf8(buf).unwrap();
3915 assert!(output.contains("stale output"));
3916 }
3917
3918 #[tokio::test]
3919 async fn test_execute_logs_follow_non_reqwest_error() {
3920 use axum::{Router, extract::Path, extract::Query, routing::get};
3921
3922 let app = Router::new()
3924 .route(
3925 "/api/v1/sessions/{id}/output",
3926 get(
3927 |_path: Path<String>,
3928 _query: Query<std::collections::HashMap<String, String>>| async {
3929 r#"{"output":"initial"}"#.to_owned()
3930 },
3931 ),
3932 )
3933 .route(
3934 "/api/v1/sessions/{id}",
3935 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3936 );
3937
3938 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3939 let addr = listener.local_addr().unwrap();
3940 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3941 let node = format!("127.0.0.1:{}", addr.port());
3942
3943 let cli = Cli {
3944 node,
3945 token: None,
3946 command: Some(Commands::Logs {
3947 name: "test".into(),
3948 lines: 100,
3949 follow: true,
3950 }),
3951 path: None,
3952 };
3953 let err = execute(&cli).await.unwrap_err();
3954 let msg = err.to_string();
3956 assert!(
3957 msg.contains("expected ident"),
3958 "Expected serde parse error, got: {msg}"
3959 );
3960 }
3961
3962 #[tokio::test]
3963 async fn test_fetch_session_status_connection_error() {
3964 let client = reqwest::Client::new();
3965 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3966 assert!(result.is_err());
3967 }
3968
3969 #[test]
3972 fn test_format_schedules_empty() {
3973 assert_eq!(format_schedules(&[]), "No schedules.");
3974 }
3975
3976 #[test]
3977 fn test_format_schedules_with_entries() {
3978 let schedules = vec![serde_json::json!({
3979 "name": "nightly",
3980 "cron": "0 3 * * *",
3981 "enabled": true,
3982 "last_run_at": null,
3983 "target_node": null
3984 })];
3985 let output = format_schedules(&schedules);
3986 assert!(output.contains("nightly"));
3987 assert!(output.contains("0 3 * * *"));
3988 assert!(output.contains("local"));
3989 assert!(output.contains("yes"));
3990 assert!(output.contains('-'));
3991 }
3992
3993 #[test]
3994 fn test_format_schedules_disabled_entry() {
3995 let schedules = vec![serde_json::json!({
3996 "name": "weekly",
3997 "cron": "0 0 * * 0",
3998 "enabled": false,
3999 "last_run_at": "2026-03-18T03:00:00Z",
4000 "target_node": "gpu-box"
4001 })];
4002 let output = format_schedules(&schedules);
4003 assert!(output.contains("weekly"));
4004 assert!(output.contains("no"));
4005 assert!(output.contains("gpu-box"));
4006 assert!(output.contains("2026-03-18T03:00"));
4007 }
4008
4009 #[test]
4010 fn test_format_schedules_header() {
4011 let schedules = vec![serde_json::json!({
4012 "name": "test",
4013 "cron": "* * * * *",
4014 "enabled": true,
4015 "last_run_at": null,
4016 "target_node": null
4017 })];
4018 let output = format_schedules(&schedules);
4019 assert!(output.contains("NAME"));
4020 assert!(output.contains("CRON"));
4021 assert!(output.contains("ENABLED"));
4022 assert!(output.contains("LAST RUN"));
4023 assert!(output.contains("NODE"));
4024 }
4025
4026 #[test]
4029 fn test_cli_parse_schedule_add() {
4030 let cli = Cli::try_parse_from([
4031 "pulpo",
4032 "schedule",
4033 "add",
4034 "nightly",
4035 "0 3 * * *",
4036 "--workdir",
4037 "/repo",
4038 "--",
4039 "claude",
4040 "-p",
4041 "review",
4042 ])
4043 .unwrap();
4044 assert!(matches!(
4045 &cli.command,
4046 Some(Commands::Schedule {
4047 action: ScheduleAction::Add { name, cron, .. }
4048 }) if name == "nightly" && cron == "0 3 * * *"
4049 ));
4050 }
4051
4052 #[test]
4053 fn test_cli_parse_schedule_add_with_node() {
4054 let cli = Cli::try_parse_from([
4055 "pulpo",
4056 "schedule",
4057 "add",
4058 "nightly",
4059 "0 3 * * *",
4060 "--workdir",
4061 "/repo",
4062 "--node",
4063 "gpu-box",
4064 "--",
4065 "claude",
4066 ])
4067 .unwrap();
4068 assert!(matches!(
4069 &cli.command,
4070 Some(Commands::Schedule {
4071 action: ScheduleAction::Add { node, .. }
4072 }) if node.as_deref() == Some("gpu-box")
4073 ));
4074 }
4075
4076 #[test]
4077 fn test_cli_parse_schedule_add_install_alias() {
4078 let cli =
4079 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4080 assert!(matches!(
4081 &cli.command,
4082 Some(Commands::Schedule {
4083 action: ScheduleAction::Add { name, .. }
4084 }) if name == "nightly"
4085 ));
4086 }
4087
4088 #[test]
4089 fn test_cli_parse_schedule_list() {
4090 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4091 assert!(matches!(
4092 &cli.command,
4093 Some(Commands::Schedule {
4094 action: ScheduleAction::List
4095 })
4096 ));
4097 }
4098
4099 #[test]
4100 fn test_cli_parse_schedule_remove() {
4101 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4102 assert!(matches!(
4103 &cli.command,
4104 Some(Commands::Schedule {
4105 action: ScheduleAction::Remove { name }
4106 }) if name == "nightly"
4107 ));
4108 }
4109
4110 #[test]
4111 fn test_cli_parse_schedule_pause() {
4112 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4113 assert!(matches!(
4114 &cli.command,
4115 Some(Commands::Schedule {
4116 action: ScheduleAction::Pause { name }
4117 }) if name == "nightly"
4118 ));
4119 }
4120
4121 #[test]
4122 fn test_cli_parse_schedule_resume() {
4123 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4124 assert!(matches!(
4125 &cli.command,
4126 Some(Commands::Schedule {
4127 action: ScheduleAction::Resume { name }
4128 }) if name == "nightly"
4129 ));
4130 }
4131
4132 #[test]
4133 fn test_cli_parse_schedule_alias() {
4134 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4135 assert!(matches!(
4136 &cli.command,
4137 Some(Commands::Schedule {
4138 action: ScheduleAction::List
4139 })
4140 ));
4141 }
4142
4143 #[test]
4144 fn test_cli_parse_schedule_list_alias() {
4145 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4146 assert!(matches!(
4147 &cli.command,
4148 Some(Commands::Schedule {
4149 action: ScheduleAction::List
4150 })
4151 ));
4152 }
4153
4154 #[test]
4155 fn test_cli_parse_schedule_remove_alias() {
4156 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4157 assert!(matches!(
4158 &cli.command,
4159 Some(Commands::Schedule {
4160 action: ScheduleAction::Remove { name }
4161 }) if name == "nightly"
4162 ));
4163 }
4164
4165 #[tokio::test]
4166 async fn test_execute_schedule_list_via_execute() {
4167 let node = start_test_server().await;
4168 let cli = Cli {
4169 node,
4170 token: None,
4171 command: Some(Commands::Schedule {
4172 action: ScheduleAction::List,
4173 }),
4174 path: None,
4175 };
4176 let result = execute(&cli).await.unwrap();
4177 #[cfg(coverage)]
4179 assert!(result.is_empty());
4180 #[cfg(not(coverage))]
4181 assert_eq!(result, "No schedules.");
4182 }
4183
4184 #[test]
4185 fn test_schedule_action_debug() {
4186 let action = ScheduleAction::List;
4187 assert_eq!(format!("{action:?}"), "List");
4188 }
4189
4190 #[test]
4191 fn test_cli_parse_send_alias() {
4192 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4193 assert!(matches!(
4194 &cli.command,
4195 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4196 ));
4197 }
4198
4199 #[test]
4200 fn test_cli_parse_spawn_no_name() {
4201 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4202 assert!(matches!(
4203 &cli.command,
4204 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4205 ));
4206 }
4207
4208 #[test]
4209 fn test_cli_parse_spawn_optional_name_with_command() {
4210 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4211 assert!(matches!(
4212 &cli.command,
4213 Some(Commands::Spawn { name, command, .. })
4214 if name.is_none() && command == &["echo", "hello"]
4215 ));
4216 }
4217
4218 #[test]
4219 fn test_cli_parse_path_shortcut() {
4220 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4221 assert!(cli.command.is_none());
4222 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4223 }
4224
4225 #[test]
4226 fn test_cli_parse_no_args() {
4227 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4228 assert!(cli.command.is_none());
4229 assert!(cli.path.is_none());
4230 }
4231
4232 #[test]
4233 fn test_derive_session_name_simple() {
4234 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4235 }
4236
4237 #[test]
4238 fn test_derive_session_name_with_special_chars() {
4239 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4240 }
4241
4242 #[test]
4243 fn test_derive_session_name_root() {
4244 assert_eq!(derive_session_name("/"), "session");
4245 }
4246
4247 #[test]
4248 fn test_derive_session_name_dots() {
4249 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4250 }
4251
4252 #[test]
4253 fn test_resolve_path_absolute() {
4254 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4255 }
4256
4257 #[test]
4258 fn test_resolve_path_relative() {
4259 let resolved = resolve_path("my-repo");
4260 assert!(resolved.ends_with("my-repo"));
4261 assert!(resolved.starts_with('/'));
4262 }
4263
4264 #[tokio::test]
4265 async fn test_execute_no_args_shows_help() {
4266 let node = start_test_server().await;
4267 let cli = Cli {
4268 node,
4269 token: None,
4270 path: None,
4271 command: None,
4272 };
4273 let result = execute(&cli).await.unwrap();
4274 assert!(
4275 result.is_empty(),
4276 "no-args should return empty string after printing help"
4277 );
4278 }
4279
4280 #[tokio::test]
4281 async fn test_execute_path_shortcut() {
4282 let node = start_test_server().await;
4283 let cli = Cli {
4284 node,
4285 token: None,
4286 path: Some("/tmp".into()),
4287 command: None,
4288 };
4289 let result = execute(&cli).await.unwrap();
4290 assert!(result.contains("Detached from session"));
4291 }
4292
4293 #[tokio::test]
4294 async fn test_deduplicate_session_name_no_conflict() {
4295 let base = "http://127.0.0.1:1";
4297 let client = reqwest::Client::new();
4298 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4299 assert_eq!(name, "fresh");
4300 }
4301
4302 #[tokio::test]
4303 async fn test_deduplicate_session_name_with_conflict() {
4304 use axum::{Router, routing::get};
4305 use std::sync::atomic::{AtomicU32, Ordering};
4306
4307 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4308 let counter = call_count.clone();
4309 let app = Router::new()
4310 .route(
4311 "/api/v1/sessions/{id}",
4312 get(move || {
4313 let c = counter.clone();
4314 async move {
4315 let n = c.fetch_add(1, Ordering::SeqCst);
4316 if n == 0 {
4317 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4319 } else {
4320 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4322 }
4323 }
4324 }),
4325 )
4326 .route(
4327 "/api/v1/peers",
4328 get(|| async {
4329 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4330 }),
4331 );
4332 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4333 let addr = listener.local_addr().unwrap();
4334 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4335 let base = format!("http://127.0.0.1:{}", addr.port());
4336 let client = reqwest::Client::new();
4337 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4338 assert_eq!(name, "repo-2");
4339 }
4340
4341 #[test]
4344 fn test_node_needs_resolution() {
4345 assert!(!node_needs_resolution("localhost:7433"));
4346 assert!(!node_needs_resolution("mac-mini:7433"));
4347 assert!(!node_needs_resolution("10.0.0.1:7433"));
4348 assert!(!node_needs_resolution("[::1]:7433"));
4349 assert!(node_needs_resolution("mac-mini"));
4350 assert!(node_needs_resolution("linux-server"));
4351 assert!(node_needs_resolution("localhost"));
4352 }
4353
4354 #[tokio::test]
4355 async fn test_resolve_node_with_port() {
4356 let client = reqwest::Client::new();
4357 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4358 assert_eq!(addr, "mac-mini:7433");
4359 assert!(token.is_none());
4360 }
4361
4362 #[tokio::test]
4363 async fn test_resolve_node_fallback_appends_port() {
4364 let client = reqwest::Client::new();
4367 let (addr, token) = resolve_node(&client, "unknown-host").await;
4368 assert_eq!(addr, "unknown-host:7433");
4369 assert!(token.is_none());
4370 }
4371
4372 #[cfg(not(coverage))]
4373 #[tokio::test]
4374 async fn test_resolve_node_finds_peer() {
4375 use axum::{Router, routing::get};
4376
4377 let app = Router::new()
4378 .route(
4379 "/api/v1/peers",
4380 get(|| async {
4381 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4382 }),
4383 )
4384 .route(
4385 "/api/v1/config",
4386 get(|| async {
4387 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4388 }),
4389 );
4390
4391 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4393 return;
4394 };
4395 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4396
4397 let client = reqwest::Client::new();
4398 let (addr, token) = resolve_node(&client, "mac-mini").await;
4399 assert_eq!(addr, "10.0.0.5:7433");
4400 assert_eq!(token, Some("peer-secret".into()));
4401 }
4402
4403 #[cfg(not(coverage))]
4404 #[tokio::test]
4405 async fn test_resolve_node_peer_no_token() {
4406 use axum::{Router, routing::get};
4407
4408 let app = Router::new()
4409 .route(
4410 "/api/v1/peers",
4411 get(|| async {
4412 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4413 }),
4414 )
4415 .route(
4416 "/api/v1/config",
4417 get(|| async {
4418 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4419 }),
4420 );
4421
4422 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4423 return; };
4425 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4426
4427 let client = reqwest::Client::new();
4428 let (addr, token) = resolve_node(&client, "test-peer").await;
4429 assert_eq!(addr, "10.0.0.9:7433");
4430 assert!(token.is_none()); }
4432
4433 #[tokio::test]
4434 async fn test_execute_with_peer_name_resolution() {
4435 let cli = Cli {
4439 node: "nonexistent-peer".into(),
4440 token: None,
4441 command: Some(Commands::List { all: false }),
4442 path: None,
4443 };
4444 let result = execute(&cli).await;
4445 assert!(result.is_err());
4447 }
4448
4449 #[test]
4452 fn test_cli_parse_spawn_auto() {
4453 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4454 assert!(matches!(
4455 &cli.command,
4456 Some(Commands::Spawn { auto, .. }) if *auto
4457 ));
4458 }
4459
4460 #[test]
4461 fn test_cli_parse_spawn_auto_default() {
4462 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4463 assert!(matches!(
4464 &cli.command,
4465 Some(Commands::Spawn { auto, .. }) if !auto
4466 ));
4467 }
4468
4469 #[tokio::test]
4470 async fn test_select_best_node_coverage_stub() {
4471 let client = reqwest::Client::new();
4473 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4476 }
4477
4478 #[cfg(not(coverage))]
4479 #[tokio::test]
4480 async fn test_select_best_node_picks_least_loaded() {
4481 use axum::{Router, routing::get};
4482
4483 let app = Router::new().route(
4484 "/api/v1/peers",
4485 get(|| async {
4486 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4487 }),
4488 );
4489 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4490 let addr = listener.local_addr().unwrap();
4491 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4492 let base = format!("http://127.0.0.1:{}", addr.port());
4493
4494 let client = reqwest::Client::new();
4495 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4496 assert_eq!(name, "idle");
4500 assert_eq!(addr, "idle:7433");
4501 }
4502
4503 #[cfg(not(coverage))]
4504 #[tokio::test]
4505 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4506 use axum::{Router, routing::get};
4507
4508 let app = Router::new().route(
4509 "/api/v1/peers",
4510 get(|| async {
4511 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4512 }),
4513 );
4514 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4515 let addr = listener.local_addr().unwrap();
4516 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4517 let base = format!("http://127.0.0.1:{}", addr.port());
4518
4519 let client = reqwest::Client::new();
4520 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4521 assert_eq!(name, "my-mac");
4522 assert_eq!(addr, "localhost:7433");
4523 }
4524
4525 #[cfg(not(coverage))]
4526 #[tokio::test]
4527 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4528 use axum::{Router, routing::get};
4529
4530 let app = Router::new().route(
4531 "/api/v1/peers",
4532 get(|| async {
4533 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4534 }),
4535 );
4536 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4537 let addr = listener.local_addr().unwrap();
4538 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4539 let base = format!("http://127.0.0.1:{}", addr.port());
4540
4541 let client = reqwest::Client::new();
4542 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4543 assert_eq!(name, "solo");
4544 assert_eq!(addr, "localhost:7433");
4545 }
4546
4547 #[cfg(not(coverage))]
4548 #[tokio::test]
4549 async fn test_execute_spawn_auto_selects_node() {
4550 use axum::{
4551 Router,
4552 http::StatusCode,
4553 routing::{get, post},
4554 };
4555
4556 let create_json = test_create_response_json();
4557
4558 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4560 let addr = listener.local_addr().unwrap();
4561 let node = format!("127.0.0.1:{}", addr.port());
4562 let peer_addr = node.clone();
4563
4564 let app = Router::new()
4565 .route(
4566 "/api/v1/peers",
4567 get(move || {
4568 let peer_addr = peer_addr.clone();
4569 async move {
4570 format!(
4571 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4572 )
4573 }
4574 }),
4575 )
4576 .route(
4577 "/api/v1/sessions",
4578 post(move || async move {
4579 (StatusCode::CREATED, create_json.clone())
4580 }),
4581 );
4582
4583 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4584
4585 let cli = Cli {
4586 node,
4587 token: None,
4588 command: Some(Commands::Spawn {
4589 name: Some("test".into()),
4590 workdir: Some("/tmp/repo".into()),
4591 ink: None,
4592 description: None,
4593 detach: true,
4594 idle_threshold: None,
4595 auto: true,
4596 worktree: false,
4597 worktree_base: None,
4598 runtime: None,
4599 secret: vec![],
4600 command: vec!["echo".into(), "hello".into()],
4601 }),
4602 path: None,
4603 };
4604 let result = execute(&cli).await.unwrap();
4605 assert!(result.contains("Created session"));
4606 }
4607
4608 #[cfg(not(coverage))]
4609 #[tokio::test]
4610 async fn test_select_best_node_peer_no_session_count() {
4611 use axum::{Router, routing::get};
4612
4613 let app = Router::new().route(
4614 "/api/v1/peers",
4615 get(|| async {
4616 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4617 }),
4618 );
4619 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4620 let addr = listener.local_addr().unwrap();
4621 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4622 let base = format!("http://127.0.0.1:{}", addr.port());
4623
4624 let client = reqwest::Client::new();
4625 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4626 assert_eq!(name, "fresh");
4628 assert_eq!(addr, "fresh:7433");
4629 }
4630
4631 #[test]
4634 fn test_cli_parse_secret_set() {
4635 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4636 assert!(matches!(
4637 &cli.command,
4638 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4639 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4640 ));
4641 }
4642
4643 #[test]
4644 fn test_cli_parse_secret_list() {
4645 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4646 assert!(matches!(
4647 &cli.command,
4648 Some(Commands::Secret {
4649 action: SecretAction::List
4650 })
4651 ));
4652 }
4653
4654 #[test]
4655 fn test_cli_parse_secret_list_alias() {
4656 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4657 assert!(matches!(
4658 &cli.command,
4659 Some(Commands::Secret {
4660 action: SecretAction::List
4661 })
4662 ));
4663 }
4664
4665 #[test]
4666 fn test_cli_parse_secret_delete() {
4667 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4668 assert!(matches!(
4669 &cli.command,
4670 Some(Commands::Secret { action: SecretAction::Delete { name } })
4671 if name == "MY_TOKEN"
4672 ));
4673 }
4674
4675 #[test]
4676 fn test_cli_parse_secret_delete_alias() {
4677 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4678 assert!(matches!(
4679 &cli.command,
4680 Some(Commands::Secret { action: SecretAction::Delete { name } })
4681 if name == "MY_TOKEN"
4682 ));
4683 }
4684
4685 #[test]
4686 fn test_cli_parse_secret_alias() {
4687 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4688 assert!(matches!(
4689 &cli.command,
4690 Some(Commands::Secret {
4691 action: SecretAction::List
4692 })
4693 ));
4694 }
4695
4696 #[test]
4697 fn test_format_secrets_empty() {
4698 let secrets: Vec<serde_json::Value> = vec![];
4699 assert_eq!(format_secrets(&secrets), "No secrets configured.");
4700 }
4701
4702 #[test]
4703 fn test_format_secrets_with_entries() {
4704 let secrets = vec![
4705 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4706 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4707 ];
4708 let output = format_secrets(&secrets);
4709 assert!(output.contains("GITHUB_TOKEN"));
4710 assert!(output.contains("NPM_TOKEN"));
4711 assert!(output.contains("NAME"));
4712 assert!(output.contains("ENV"));
4713 assert!(output.contains("CREATED"));
4714 }
4715
4716 #[test]
4717 fn test_format_secrets_with_env() {
4718 let secrets = vec![
4719 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4720 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4721 ];
4722 let output = format_secrets(&secrets);
4723 assert!(output.contains("GH_WORK"));
4724 assert!(output.contains("GITHUB_TOKEN"));
4725 assert!(output.contains("NPM_TOKEN"));
4726 }
4727
4728 #[test]
4729 fn test_format_secrets_short_timestamp() {
4730 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4731 let output = format_secrets(&secrets);
4732 assert!(output.contains("now"));
4733 }
4734
4735 #[test]
4736 fn test_format_schedules_short_last_run_at() {
4737 let schedules = vec![serde_json::json!({
4739 "name": "test",
4740 "cron": "* * * * *",
4741 "enabled": true,
4742 "last_run_at": "short",
4743 "target_node": null
4744 })];
4745 let output = format_schedules(&schedules);
4746 assert!(output.contains("short"));
4747 }
4748
4749 #[test]
4750 fn test_format_sessions_multibyte_command_truncation() {
4751 use chrono::Utc;
4752 use pulpo_common::session::SessionStatus;
4753 use uuid::Uuid;
4754
4755 let sessions = vec![Session {
4757 id: Uuid::nil(),
4758 name: "test".into(),
4759 workdir: "/tmp".into(),
4760 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4761 description: None,
4762 status: SessionStatus::Active,
4763 exit_code: None,
4764 backend_session_id: None,
4765 output_snapshot: None,
4766 metadata: None,
4767 ink: None,
4768 intervention_code: None,
4769 intervention_reason: None,
4770 intervention_at: None,
4771 last_output_at: None,
4772 idle_since: None,
4773 idle_threshold_secs: None,
4774 worktree_path: None,
4775 worktree_branch: None,
4776 runtime: Runtime::Tmux,
4777 created_at: Utc::now(),
4778 updated_at: Utc::now(),
4779 }];
4780 let output = format_sessions(&sessions);
4781 assert!(output.contains("..."));
4782 }
4783}