1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8#[cfg(test)]
9use pulpo_common::session::Runtime;
10use pulpo_common::session::Session;
11
12#[derive(Parser, Debug)]
13#[command(
14 name = "pulpo",
15 about = "Manage agent sessions across your machines",
16 version = env!("PULPO_VERSION"),
17 args_conflicts_with_subcommands = true
18)]
19pub struct Cli {
20 #[arg(long, default_value = "localhost:7433")]
22 pub node: String,
23
24 #[arg(long)]
26 pub token: Option<String>,
27
28 #[command(subcommand)]
29 pub command: Option<Commands>,
30
31 #[arg(value_name = "PATH")]
33 pub path: Option<String>,
34}
35
36#[derive(Subcommand, Debug)]
37#[allow(clippy::large_enum_variant)]
38pub enum Commands {
39 #[command(visible_alias = "a")]
41 Attach {
42 name: String,
44 },
45
46 #[command(visible_alias = "i", visible_alias = "send")]
48 Input {
49 name: String,
51 text: Option<String>,
53 },
54
55 #[command(visible_alias = "s")]
57 Spawn {
58 name: Option<String>,
60
61 #[arg(long)]
63 workdir: Option<String>,
64
65 #[arg(long)]
67 ink: Option<String>,
68
69 #[arg(long)]
71 description: Option<String>,
72
73 #[arg(short, long)]
75 detach: bool,
76
77 #[arg(long)]
79 idle_threshold: Option<u32>,
80
81 #[arg(long)]
83 auto: bool,
84
85 #[arg(long)]
87 worktree: bool,
88
89 #[arg(long = "worktree-base")]
91 worktree_base: Option<String>,
92
93 #[arg(long)]
95 runtime: Option<String>,
96
97 #[arg(long)]
99 secret: Vec<String>,
100
101 #[arg(last = true)]
103 command: Vec<String>,
104 },
105
106 #[command(visible_alias = "ls")]
108 List {
109 #[arg(short, long)]
111 all: bool,
112 },
113
114 #[command(visible_alias = "l")]
116 Logs {
117 name: String,
119
120 #[arg(long, default_value = "100")]
122 lines: usize,
123
124 #[arg(short, long)]
126 follow: bool,
127 },
128
129 #[command(visible_alias = "k", alias = "kill")]
131 Stop {
132 #[arg(required = true)]
134 names: Vec<String>,
135
136 #[arg(long, short = 'p')]
138 purge: bool,
139 },
140
141 Cleanup,
143
144 #[command(visible_alias = "r")]
146 Resume {
147 name: String,
149 },
150
151 #[command(visible_alias = "n")]
153 Nodes,
154
155 #[command(visible_alias = "iv")]
157 Interventions {
158 name: String,
160 },
161
162 Ui,
164
165 #[command(visible_alias = "sched")]
167 Schedule {
168 #[command(subcommand)]
169 action: ScheduleAction,
170 },
171
172 #[command(visible_alias = "sec")]
174 Secret {
175 #[command(subcommand)]
176 action: SecretAction,
177 },
178
179 #[command(visible_alias = "wt")]
181 Worktree {
182 #[command(subcommand)]
183 action: WorktreeAction,
184 },
185}
186
187#[derive(Subcommand, Debug)]
188pub enum SecretAction {
189 Set {
191 name: String,
193 value: String,
195 #[arg(long)]
197 env: Option<String>,
198 },
199 #[command(visible_alias = "ls")]
201 List,
202 #[command(visible_alias = "rm")]
204 Delete {
205 name: String,
207 },
208}
209
210#[derive(Subcommand, Debug)]
211pub enum WorktreeAction {
212 #[command(visible_alias = "ls")]
214 List,
215}
216
217#[derive(Subcommand, Debug)]
218pub enum ScheduleAction {
219 #[command(alias = "install")]
221 Add {
222 name: String,
224 cron: String,
226 #[arg(long)]
228 workdir: Option<String>,
229 #[arg(long)]
231 node: Option<String>,
232 #[arg(long)]
234 ink: Option<String>,
235 #[arg(long)]
237 description: Option<String>,
238 #[arg(last = true)]
240 command: Vec<String>,
241 },
242 #[command(alias = "ls")]
244 List,
245 #[command(alias = "rm")]
247 Remove {
248 name: String,
250 },
251 Pause {
253 name: String,
255 },
256 Resume {
258 name: String,
260 },
261}
262
263const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
265
266fn resolve_path(path: &str) -> String {
268 let p = std::path::Path::new(path);
269 if p.is_absolute() {
270 path.to_owned()
271 } else {
272 std::env::current_dir().map_or_else(
273 |_| path.to_owned(),
274 |cwd| cwd.join(p).to_string_lossy().into_owned(),
275 )
276 }
277}
278
279fn derive_session_name(path: &str) -> String {
281 let basename = std::path::Path::new(path)
282 .file_name()
283 .and_then(|n| n.to_str())
284 .unwrap_or("session");
285 let kebab: String = basename
287 .chars()
288 .map(|c| {
289 if c.is_ascii_alphanumeric() {
290 c.to_ascii_lowercase()
291 } else {
292 '-'
293 }
294 })
295 .collect();
296 let mut result = String::new();
298 for c in kebab.chars() {
299 if c == '-' && result.ends_with('-') {
300 continue;
301 }
302 result.push(c);
303 }
304 let result = result.trim_matches('-').to_owned();
305 if result.is_empty() {
306 "session".to_owned()
307 } else {
308 result
309 }
310}
311
312async fn deduplicate_session_name(
314 client: &reqwest::Client,
315 base: &str,
316 name: &str,
317 token: Option<&str>,
318) -> String {
319 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
321 .send()
322 .await;
323 match resp {
324 Ok(r) if r.status().is_success() => {
325 for i in 2..=99 {
327 let candidate = format!("{name}-{i}");
328 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
329 .send()
330 .await;
331 match resp {
332 Ok(r) if r.status().is_success() => {}
333 _ => return candidate,
334 }
335 }
336 format!("{name}-100")
337 }
338 _ => name.to_owned(),
339 }
340}
341
342pub fn base_url(node: &str) -> String {
344 format!("http://{node}")
345}
346
347#[derive(serde::Deserialize)]
349struct OutputResponse {
350 output: String,
351}
352
353fn format_repo(session: &Session) -> String {
356 let basename = session
357 .workdir
358 .rsplit('/')
359 .next()
360 .unwrap_or(&session.workdir)
361 .to_owned();
362 let mut display = match session.git_branch.as_deref() {
363 Some(branch) => format!("{basename}@{branch}"),
364 None => basename,
365 };
366
367 let ins = session.git_insertions.unwrap_or(0);
369 let del = session.git_deletions.unwrap_or(0);
370 if ins > 0 || del > 0 {
371 display = format!("{display} +{ins}/-{del}");
372 }
373
374 if let Some(ahead) = session.git_ahead
376 && ahead > 0
377 {
378 display = format!("{display} \u{2191}{ahead}");
379 }
380
381 if display.len() <= 30 {
382 display
383 } else {
384 let truncated: String = display.chars().take(27).collect();
385 format!("{truncated}...")
386 }
387}
388
389fn format_sessions(sessions: &[Session]) -> String {
390 if sessions.is_empty() {
391 return "No sessions.".into();
392 }
393 let mut lines = vec![format!(
394 "{:<10} {:<24} {:<12} {:<32} {}",
395 "ID", "NAME", "STATUS", "REPO", "COMMAND"
396 )];
397 for s in sessions {
398 let cmd_display = if s.command.len() > 40 {
399 let truncated: String = s.command.chars().take(37).collect();
400 format!("{truncated}...")
401 } else {
402 s.command.clone()
403 };
404 let short_id = &s.id.to_string()[..8];
405 let has_pr = s
406 .metadata
407 .as_ref()
408 .is_some_and(|m| m.contains_key("pr_url"));
409 let has_error = s
410 .metadata
411 .as_ref()
412 .is_some_and(|m| m.contains_key("error_status"));
413 let mut name_display = s.name.clone();
414 if s.worktree_path.is_some() {
415 name_display = format!("{name_display} [wt]");
416 }
417 if has_pr {
418 name_display = format!("{name_display} [PR]");
419 }
420 if has_error {
421 name_display = format!("{name_display} [!]");
422 }
423 let repo_display = format_repo(s);
424 lines.push(format!(
425 "{:<10} {:<24} {:<12} {:<32} {}",
426 short_id, name_display, s.status, repo_display, cmd_display
427 ));
428 }
429 lines.join("\n")
430}
431
432fn format_nodes(resp: &PeersResponse) -> String {
434 let mut lines = vec![format!(
435 "{:<20} {:<25} {:<10} {}",
436 "NAME", "ADDRESS", "STATUS", "SESSIONS"
437 )];
438 lines.push(format!(
439 "{:<20} {:<25} {:<10} {}",
440 resp.local.name, "(local)", "online", "-"
441 ));
442 for p in &resp.peers {
443 let sessions = p
444 .session_count
445 .map_or_else(|| "-".into(), |c| c.to_string());
446 lines.push(format!(
447 "{:<20} {:<25} {:<10} {}",
448 p.name, p.address, p.status, sessions
449 ));
450 }
451 lines.join("\n")
452}
453
454fn format_interventions(events: &[InterventionEventResponse]) -> String {
456 if events.is_empty() {
457 return "No intervention events.".into();
458 }
459 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
460 for e in events {
461 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
462 }
463 lines.join("\n")
464}
465
466#[cfg_attr(coverage, allow(dead_code))]
468fn build_open_command(url: &str) -> std::process::Command {
469 #[cfg(target_os = "macos")]
470 {
471 let mut cmd = std::process::Command::new("open");
472 cmd.arg(url);
473 cmd
474 }
475 #[cfg(target_os = "linux")]
476 {
477 let mut cmd = std::process::Command::new("xdg-open");
478 cmd.arg(url);
479 cmd
480 }
481 #[cfg(target_os = "windows")]
482 {
483 let mut cmd = std::process::Command::new("cmd");
484 cmd.args(["/C", "start", url]);
485 cmd
486 }
487 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
488 {
489 let mut cmd = std::process::Command::new("xdg-open");
491 cmd.arg(url);
492 cmd
493 }
494}
495
496#[cfg(not(coverage))]
498fn open_browser(url: &str) -> Result<()> {
499 build_open_command(url).status()?;
500 Ok(())
501}
502
503#[cfg(coverage)]
505fn open_browser(_url: &str) -> Result<()> {
506 Ok(())
507}
508
509#[cfg_attr(coverage, allow(dead_code))]
512fn build_attach_command(backend_session_id: &str) -> std::process::Command {
513 if let Some(container) = backend_session_id.strip_prefix("docker:") {
515 let mut cmd = std::process::Command::new("docker");
516 cmd.args(["exec", "-it", container, "/bin/sh"]);
517 return cmd;
518 }
519 #[cfg(not(target_os = "windows"))]
521 {
522 let mut cmd = std::process::Command::new("tmux");
523 cmd.args(["attach-session", "-t", backend_session_id]);
524 cmd
525 }
526 #[cfg(target_os = "windows")]
527 {
528 let mut cmd = std::process::Command::new("cmd");
530 cmd.args([
531 "/C",
532 "echo",
533 "Attach not available on Windows. Use the web UI or --runtime docker.",
534 ]);
535 cmd
536 }
537}
538
539#[cfg(not(any(test, coverage, target_os = "windows")))]
541fn attach_session(backend_session_id: &str) -> Result<()> {
542 let status = build_attach_command(backend_session_id).status()?;
543 if !status.success() {
544 anyhow::bail!("attach failed with {status}");
545 }
546 Ok(())
547}
548
549#[cfg(all(target_os = "windows", not(test), not(coverage)))]
551fn attach_session(_backend_session_id: &str) -> Result<()> {
552 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
553 Ok(())
554}
555
556#[cfg(any(test, coverage))]
558#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
559fn attach_session(_backend_session_id: &str) -> Result<()> {
560 Ok(())
561}
562
563fn api_error(text: &str) -> anyhow::Error {
565 serde_json::from_str::<serde_json::Value>(text)
566 .ok()
567 .and_then(|v| v["error"].as_str().map(String::from))
568 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
569}
570
571async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
573 if resp.status().is_success() {
574 Ok(resp.text().await?)
575 } else {
576 let text = resp.text().await?;
577 Err(api_error(&text))
578 }
579}
580
581fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
583 if err.is_connect() {
584 anyhow::anyhow!(
585 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
586 )
587 } else {
588 anyhow::anyhow!("Network error connecting to {node}: {err}")
589 }
590}
591
592fn is_localhost(node: &str) -> bool {
594 let host = node.split(':').next().unwrap_or(node);
595 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
596}
597
598async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
600 let resp = client
601 .get(format!("{base}/api/v1/auth/token"))
602 .send()
603 .await
604 .ok()?;
605 let body: AuthTokenResponse = resp.json().await.ok()?;
606 if body.token.is_empty() {
607 None
608 } else {
609 Some(body.token)
610 }
611}
612
613async fn resolve_token(
615 client: &reqwest::Client,
616 base: &str,
617 node: &str,
618 explicit: Option<&str>,
619) -> Option<String> {
620 if let Some(t) = explicit {
621 return Some(t.to_owned());
622 }
623 if is_localhost(node) {
624 return discover_token(client, base).await;
625 }
626 None
627}
628
629fn node_needs_resolution(node: &str) -> bool {
631 !node.contains(':')
632}
633
634#[cfg(not(coverage))]
641async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
642 if !node_needs_resolution(node) {
644 return (node.to_owned(), None);
645 }
646
647 let local_base = "http://localhost:7433";
649 let mut resolved_address: Option<String> = None;
650
651 if let Ok(resp) = client
652 .get(format!("{local_base}/api/v1/peers"))
653 .send()
654 .await
655 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
656 {
657 for peer in &peers_resp.peers {
658 if peer.name == node {
659 resolved_address = Some(peer.address.clone());
660 break;
661 }
662 }
663 }
664
665 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
666
667 let peer_token = if let Ok(resp) = client
669 .get(format!("{local_base}/api/v1/config"))
670 .send()
671 .await
672 && let Ok(config) = resp.json::<ConfigResponse>().await
673 && let Some(entry) = config.peers.get(node)
674 {
675 entry.token().map(String::from)
676 } else {
677 None
678 };
679
680 (address, peer_token)
681}
682
683#[cfg(coverage)]
685async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
686 if node_needs_resolution(node) {
687 (format!("{node}:7433"), None)
688 } else {
689 (node.to_owned(), None)
690 }
691}
692
693fn authed_get(
695 client: &reqwest::Client,
696 url: String,
697 token: Option<&str>,
698) -> reqwest::RequestBuilder {
699 let req = client.get(url);
700 if let Some(t) = token {
701 req.bearer_auth(t)
702 } else {
703 req
704 }
705}
706
707fn authed_post(
709 client: &reqwest::Client,
710 url: String,
711 token: Option<&str>,
712) -> reqwest::RequestBuilder {
713 let req = client.post(url);
714 if let Some(t) = token {
715 req.bearer_auth(t)
716 } else {
717 req
718 }
719}
720
721fn authed_delete(
723 client: &reqwest::Client,
724 url: String,
725 token: Option<&str>,
726) -> reqwest::RequestBuilder {
727 let req = client.delete(url);
728 if let Some(t) = token {
729 req.bearer_auth(t)
730 } else {
731 req
732 }
733}
734
735#[cfg(not(coverage))]
737fn authed_put(
738 client: &reqwest::Client,
739 url: String,
740 token: Option<&str>,
741) -> reqwest::RequestBuilder {
742 let req = client.put(url);
743 if let Some(t) = token {
744 req.bearer_auth(t)
745 } else {
746 req
747 }
748}
749
750async fn fetch_output(
752 client: &reqwest::Client,
753 base: &str,
754 name: &str,
755 lines: usize,
756 token: Option<&str>,
757) -> Result<String> {
758 let resp = authed_get(
759 client,
760 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
761 token,
762 )
763 .send()
764 .await?;
765 let text = ok_or_api_error(resp).await?;
766 let output: OutputResponse = serde_json::from_str(&text)?;
767 Ok(output.output)
768}
769
770async fn fetch_session_status(
772 client: &reqwest::Client,
773 base: &str,
774 name: &str,
775 token: Option<&str>,
776) -> Result<String> {
777 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
778 .send()
779 .await?;
780 let text = ok_or_api_error(resp).await?;
781 let session: Session = serde_json::from_str(&text)?;
782 Ok(session.status.to_string())
783}
784
785async fn check_session_alive(
789 client: &reqwest::Client,
790 base: &str,
791 session_id: &str,
792 token: Option<&str>,
793) -> Result<()> {
794 for _ in 0..3 {
796 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
797 let resp = authed_get(
799 client,
800 format!("{base}/api/v1/sessions/{session_id}"),
801 token,
802 )
803 .send()
804 .await;
805 if let Ok(resp) = resp
806 && let Ok(text) = ok_or_api_error(resp).await
807 && let Ok(session) = serde_json::from_str::<Session>(&text)
808 {
809 match session.status.to_string().as_str() {
810 "creating" => continue,
811 "lost" | "stopped" => {
812 anyhow::bail!(
813 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
814 session.name,
815 session.name
816 );
817 }
818 _ => return Ok(()),
819 }
820 }
821 break;
823 }
824 Ok(())
825}
826
827fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
834 if prev.is_empty() {
835 return new;
836 }
837
838 let prev_lines: Vec<&str> = prev.lines().collect();
839 let new_lines: Vec<&str> = new.lines().collect();
840
841 if new_lines.is_empty() {
842 return "";
843 }
844
845 let last_prev = prev_lines[prev_lines.len() - 1];
847
848 for i in (0..new_lines.len()).rev() {
850 if new_lines[i] == last_prev {
851 let overlap_len = prev_lines.len().min(i + 1);
853 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
854 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
855 if prev_tail == new_overlap {
856 if i + 1 < new_lines.len() {
857 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
859 return new.get(consumed.min(new.len())..).unwrap_or("");
860 }
861 return "";
862 }
863 }
864 }
865
866 new
868}
869
870async fn follow_logs(
872 client: &reqwest::Client,
873 base: &str,
874 name: &str,
875 lines: usize,
876 token: Option<&str>,
877 writer: &mut (dyn std::io::Write + Send),
878) -> Result<()> {
879 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
880 write!(writer, "{prev_output}")?;
881
882 let mut unchanged_ticks: u32 = 0;
883
884 loop {
885 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
886
887 let new_output = fetch_output(client, base, name, lines, token).await?;
889
890 let diff = diff_output(&prev_output, &new_output);
891 if diff.is_empty() {
892 unchanged_ticks += 1;
893 } else {
894 write!(writer, "{diff}")?;
895 unchanged_ticks = 0;
896 }
897
898 if new_output.contains(AGENT_EXIT_MARKER) {
900 break;
901 }
902
903 prev_output = new_output;
904
905 if unchanged_ticks >= 3 {
907 let status = fetch_session_status(client, base, name, token).await?;
908 let is_terminal = status == "ready" || status == "stopped" || status == "lost";
909 if is_terminal {
910 break;
911 }
912 }
913 }
914 Ok(())
915}
916
917#[cfg(not(coverage))]
921async fn execute_schedule(
922 client: &reqwest::Client,
923 action: &ScheduleAction,
924 base: &str,
925 token: Option<&str>,
926) -> Result<String> {
927 match action {
928 ScheduleAction::Add {
929 name,
930 cron,
931 workdir,
932 node,
933 ink,
934 description,
935 command,
936 } => {
937 let cmd = if command.is_empty() {
938 None
939 } else {
940 Some(command.join(" "))
941 };
942 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
943 std::env::current_dir()
944 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
945 });
946 let mut body = serde_json::json!({
947 "name": name,
948 "cron": cron,
949 "workdir": resolved_workdir,
950 });
951 if let Some(c) = &cmd {
952 body["command"] = serde_json::json!(c);
953 }
954 if let Some(n) = node {
955 body["target_node"] = serde_json::json!(n);
956 }
957 if let Some(i) = ink {
958 body["ink"] = serde_json::json!(i);
959 }
960 if let Some(d) = description {
961 body["description"] = serde_json::json!(d);
962 }
963 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
964 .json(&body)
965 .send()
966 .await?;
967 ok_or_api_error(resp).await?;
968 Ok(format!("Created schedule \"{name}\""))
969 }
970 ScheduleAction::List => {
971 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
972 .send()
973 .await?;
974 let text = ok_or_api_error(resp).await?;
975 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
976 Ok(format_schedules(&schedules))
977 }
978 ScheduleAction::Remove { name } => {
979 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
980 .send()
981 .await?;
982 ok_or_api_error(resp).await?;
983 Ok(format!("Removed schedule \"{name}\""))
984 }
985 ScheduleAction::Pause { name } => {
986 let body = serde_json::json!({ "enabled": false });
987 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
988 .json(&body)
989 .send()
990 .await?;
991 ok_or_api_error(resp).await?;
992 Ok(format!("Paused schedule \"{name}\""))
993 }
994 ScheduleAction::Resume { name } => {
995 let body = serde_json::json!({ "enabled": true });
996 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
997 .json(&body)
998 .send()
999 .await?;
1000 ok_or_api_error(resp).await?;
1001 Ok(format!("Resumed schedule \"{name}\""))
1002 }
1003 }
1004}
1005
1006#[cfg(coverage)]
1008#[allow(clippy::unnecessary_wraps)]
1009async fn execute_schedule(
1010 _client: &reqwest::Client,
1011 _action: &ScheduleAction,
1012 _base: &str,
1013 _token: Option<&str>,
1014) -> Result<String> {
1015 Ok(String::new())
1016}
1017
1018#[cfg_attr(coverage, allow(dead_code))]
1022fn format_secrets(secrets: &[serde_json::Value]) -> String {
1023 if secrets.is_empty() {
1024 return "No secrets configured.".into();
1025 }
1026 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
1027 for s in secrets {
1028 let name = s["name"].as_str().unwrap_or("?");
1029 let env_display = s["env"]
1030 .as_str()
1031 .map_or_else(|| name.to_owned(), String::from);
1032 let created = s["created_at"]
1033 .as_str()
1034 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1035 lines.push(format!("{name:<24} {env_display:<24} {created}"));
1036 }
1037 lines.join("\n")
1038}
1039
1040#[cfg(not(coverage))]
1042async fn execute_secret(
1043 client: &reqwest::Client,
1044 action: &SecretAction,
1045 base: &str,
1046 token: Option<&str>,
1047) -> Result<String> {
1048 match action {
1049 SecretAction::Set { name, value, env } => {
1050 let mut body = serde_json::json!({ "value": value });
1051 if let Some(e) = env {
1052 body["env"] = serde_json::json!(e);
1053 }
1054 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1055 .json(&body)
1056 .send()
1057 .await?;
1058 ok_or_api_error(resp).await?;
1059 Ok(format!("Secret \"{name}\" set."))
1060 }
1061 SecretAction::List => {
1062 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1063 .send()
1064 .await?;
1065 let text = ok_or_api_error(resp).await?;
1066 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1067 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1068 Ok(format_secrets(secrets))
1069 }
1070 SecretAction::Delete { name } => {
1071 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1072 .send()
1073 .await?;
1074 ok_or_api_error(resp).await?;
1075 Ok(format!("Secret \"{name}\" deleted."))
1076 }
1077 }
1078}
1079
1080#[cfg(coverage)]
1082#[allow(clippy::unnecessary_wraps)]
1083async fn execute_secret(
1084 _client: &reqwest::Client,
1085 _action: &SecretAction,
1086 _base: &str,
1087 _token: Option<&str>,
1088) -> Result<String> {
1089 Ok(String::new())
1090}
1091
1092#[cfg(not(coverage))]
1094async fn execute_worktree(
1095 client: &reqwest::Client,
1096 action: &WorktreeAction,
1097 base: &str,
1098 token: Option<&str>,
1099) -> Result<String> {
1100 match action {
1101 WorktreeAction::List => {
1102 let resp = authed_get(client, format!("{base}/api/v1/sessions"), token)
1103 .send()
1104 .await?;
1105 let text = ok_or_api_error(resp).await?;
1106 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1107 let wt_sessions: Vec<&Session> = sessions
1108 .iter()
1109 .filter(|s| s.worktree_path.is_some())
1110 .collect();
1111 Ok(format_worktree_sessions(&wt_sessions))
1112 }
1113 }
1114}
1115
1116#[cfg(coverage)]
1118#[allow(clippy::unnecessary_wraps)]
1119async fn execute_worktree(
1120 _client: &reqwest::Client,
1121 _action: &WorktreeAction,
1122 _base: &str,
1123 _token: Option<&str>,
1124) -> Result<String> {
1125 Ok(String::new())
1126}
1127
1128fn format_worktree_sessions(sessions: &[&Session]) -> String {
1130 if sessions.is_empty() {
1131 return "No worktree sessions.".into();
1132 }
1133 let mut lines = vec![format!(
1134 "{:<20} {:<20} {:<10} {}",
1135 "NAME", "BRANCH", "STATUS", "PATH"
1136 )];
1137 for s in sessions {
1138 let branch = s.worktree_branch.as_deref().unwrap_or("-");
1139 let path = s.worktree_path.as_deref().unwrap_or("-");
1140 lines.push(format!(
1141 "{:<20} {:<20} {:<10} {}",
1142 s.name, branch, s.status, path
1143 ));
1144 }
1145 lines.join("\n")
1146}
1147
1148#[cfg_attr(coverage, allow(dead_code))]
1150fn format_schedules(schedules: &[serde_json::Value]) -> String {
1151 if schedules.is_empty() {
1152 return "No schedules.".into();
1153 }
1154 let mut lines = vec![format!(
1155 "{:<20} {:<18} {:<8} {:<12} {}",
1156 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1157 )];
1158 for s in schedules {
1159 let name = s["name"].as_str().unwrap_or("?");
1160 let cron = s["cron"].as_str().unwrap_or("?");
1161 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1162 "yes"
1163 } else {
1164 "no"
1165 };
1166 let last_run = s["last_run_at"]
1167 .as_str()
1168 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1169 let node = s["target_node"].as_str().unwrap_or("local");
1170 lines.push(format!(
1171 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1172 ));
1173 }
1174 lines.join("\n")
1175}
1176
1177#[cfg(not(coverage))]
1181async fn select_best_node(
1182 client: &reqwest::Client,
1183 base: &str,
1184 token: Option<&str>,
1185) -> Result<(String, String)> {
1186 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1187 .send()
1188 .await?;
1189 let text = ok_or_api_error(resp).await?;
1190 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1191
1192 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1196 if peer.status != pulpo_common::peer::PeerStatus::Online {
1197 continue;
1198 }
1199 let sessions = peer.session_count.unwrap_or(0);
1200 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1201 #[allow(clippy::cast_precision_loss)]
1203 let score = sessions as f64 - (mem as f64 / 1024.0);
1204 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1205 best = Some((peer.address.clone(), peer.name.clone(), score));
1206 }
1207 }
1208
1209 match best {
1211 Some((addr, name, _)) => Ok((addr, name)),
1212 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1213 }
1214}
1215
1216#[cfg(coverage)]
1218#[allow(clippy::unnecessary_wraps)]
1219async fn select_best_node(
1220 _client: &reqwest::Client,
1221 _base: &str,
1222 _token: Option<&str>,
1223) -> Result<(String, String)> {
1224 Ok(("localhost:7433".into(), "local".into()))
1225}
1226
1227#[cfg(not(coverage))]
1230async fn ensure_daemon_running(client: &reqwest::Client, url: &str, node: &str) -> bool {
1231 if !is_localhost(node) {
1232 return true; }
1234 if client
1236 .get(format!("{url}/api/v1/health"))
1237 .timeout(std::time::Duration::from_secs(2))
1238 .send()
1239 .await
1240 .is_ok()
1241 {
1242 return true; }
1244
1245 eprintln!("pulpod is not running — starting it...");
1246
1247 let started = if cfg!(target_os = "macos") {
1249 std::process::Command::new("brew")
1250 .args(["services", "start", "pulpo"])
1251 .stdout(std::process::Stdio::null())
1252 .stderr(std::process::Stdio::null())
1253 .status()
1254 .is_ok_and(|s| s.success())
1255 } else {
1256 std::process::Command::new("systemctl")
1257 .args(["--user", "start", "pulpo"])
1258 .stdout(std::process::Stdio::null())
1259 .stderr(std::process::Stdio::null())
1260 .status()
1261 .is_ok_and(|s| s.success())
1262 };
1263
1264 if !started {
1265 if std::process::Command::new("pulpod")
1267 .stdout(std::process::Stdio::null())
1268 .stderr(std::process::Stdio::null())
1269 .spawn()
1270 .is_err()
1271 {
1272 eprintln!(
1273 "Failed to start pulpod. Install it with: brew install darioblanco/tap/pulpo"
1274 );
1275 return false;
1276 }
1277 }
1278
1279 for _ in 0..10 {
1281 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1282 if client
1283 .get(format!("{url}/api/v1/health"))
1284 .timeout(std::time::Duration::from_secs(1))
1285 .send()
1286 .await
1287 .is_ok()
1288 {
1289 eprintln!("pulpod started.");
1290 return true;
1291 }
1292 }
1293 eprintln!("pulpod did not start in time.");
1294 false
1295}
1296
1297#[cfg(coverage)]
1299async fn ensure_daemon_running(_client: &reqwest::Client, _url: &str, _node: &str) -> bool {
1300 true
1301}
1302
1303#[allow(clippy::too_many_lines)]
1305pub async fn execute(cli: &Cli) -> Result<String> {
1306 let client = reqwest::Client::new();
1307 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1308 let url = base_url(&resolved_node);
1309 let node = &resolved_node;
1310
1311 ensure_daemon_running(&client, &url, node).await;
1313
1314 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1315 .await
1316 .or(peer_token);
1317
1318 if cli.command.is_none() && cli.path.is_none() {
1320 use clap::CommandFactory;
1322 let mut cmd = Cli::command();
1323 cmd.print_help()?;
1324 println!();
1325 return Ok(String::new());
1326 }
1327 if cli.command.is_none() {
1328 let path = cli.path.as_deref().unwrap_or(".");
1329 let resolved_workdir = resolve_path(path);
1330 let base_name = derive_session_name(&resolved_workdir);
1331 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1332 let body = serde_json::json!({
1333 "name": name,
1334 "workdir": resolved_workdir,
1335 });
1336 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1337 .json(&body)
1338 .send()
1339 .await
1340 .map_err(|e| friendly_error(&e, node))?;
1341 let text = ok_or_api_error(resp).await?;
1342 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1343 let msg = format!(
1344 "Created session \"{}\" ({})",
1345 resp.session.name, resp.session.id
1346 );
1347 let backend_id = resp
1348 .session
1349 .backend_session_id
1350 .as_deref()
1351 .unwrap_or(&resp.session.name);
1352 eprintln!("{msg}");
1353 attach_session(backend_id)?;
1356 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1357 }
1358
1359 match cli.command.as_ref().unwrap() {
1360 Commands::Attach { name } => {
1361 let resp = authed_get(
1363 &client,
1364 format!("{url}/api/v1/sessions/{name}"),
1365 token.as_deref(),
1366 )
1367 .send()
1368 .await
1369 .map_err(|e| friendly_error(&e, node))?;
1370 let text = ok_or_api_error(resp).await?;
1371 let session: Session = serde_json::from_str(&text)?;
1372 match session.status.to_string().as_str() {
1373 "lost" => {
1374 anyhow::bail!(
1375 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1376 );
1377 }
1378 "stopped" => {
1379 anyhow::bail!(
1380 "Session \"{name}\" is {} — cannot attach to a stopped session.",
1381 session.status
1382 );
1383 }
1384 _ => {}
1385 }
1386 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1387 attach_session(&backend_id)?;
1388 Ok(format!("Detached from session {name}."))
1389 }
1390 Commands::Input { name, text } => {
1391 let input_text = text.as_deref().unwrap_or("\n");
1392 let body = serde_json::json!({ "text": input_text });
1393 let resp = authed_post(
1394 &client,
1395 format!("{url}/api/v1/sessions/{name}/input"),
1396 token.as_deref(),
1397 )
1398 .json(&body)
1399 .send()
1400 .await
1401 .map_err(|e| friendly_error(&e, node))?;
1402 ok_or_api_error(resp).await?;
1403 Ok(format!("Sent input to session {name}."))
1404 }
1405 Commands::List { all } => {
1406 let list_url = if *all {
1407 format!("{url}/api/v1/sessions")
1408 } else {
1409 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1410 };
1411 let resp = authed_get(&client, list_url, token.as_deref())
1412 .send()
1413 .await
1414 .map_err(|e| friendly_error(&e, node))?;
1415 let text = ok_or_api_error(resp).await?;
1416 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1417 Ok(format_sessions(&sessions))
1418 }
1419 Commands::Nodes => {
1420 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1421 .send()
1422 .await
1423 .map_err(|e| friendly_error(&e, node))?;
1424 let text = ok_or_api_error(resp).await?;
1425 let resp: PeersResponse = serde_json::from_str(&text)?;
1426 Ok(format_nodes(&resp))
1427 }
1428 Commands::Spawn {
1429 workdir,
1430 name,
1431 ink,
1432 description,
1433 detach,
1434 idle_threshold,
1435 auto,
1436 worktree,
1437 worktree_base,
1438 runtime,
1439 secret,
1440 command,
1441 } => {
1442 let cmd = if command.is_empty() {
1443 None
1444 } else {
1445 Some(command.join(" "))
1446 };
1447 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1449 std::env::current_dir()
1450 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1451 });
1452 let resolved_name = if let Some(n) = name {
1454 n.clone()
1455 } else {
1456 let base_name = derive_session_name(&resolved_workdir);
1457 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1458 };
1459 let mut body = serde_json::json!({
1460 "name": resolved_name,
1461 "workdir": resolved_workdir,
1462 });
1463 if let Some(c) = &cmd {
1464 body["command"] = serde_json::json!(c);
1465 }
1466 if let Some(i) = ink {
1467 body["ink"] = serde_json::json!(i);
1468 }
1469 if let Some(d) = description {
1470 body["description"] = serde_json::json!(d);
1471 }
1472 if let Some(t) = idle_threshold {
1473 body["idle_threshold_secs"] = serde_json::json!(t);
1474 }
1475 if *worktree || worktree_base.is_some() {
1477 body["worktree"] = serde_json::json!(true);
1478 if let Some(base) = worktree_base {
1479 body["worktree_base"] = serde_json::json!(base);
1480 eprintln!(
1481 "Worktree: branch {resolved_name} (from {base}) in ~/.pulpo/worktrees/{resolved_name}/"
1482 );
1483 } else {
1484 eprintln!(
1485 "Worktree: branch {resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1486 );
1487 }
1488 }
1489 if let Some(rt) = runtime {
1490 body["runtime"] = serde_json::json!(rt);
1491 }
1492 if !secret.is_empty() {
1493 body["secrets"] = serde_json::json!(secret);
1494 }
1495 let spawn_url = if *auto {
1496 let (auto_addr, auto_name) =
1497 select_best_node(&client, &url, token.as_deref()).await?;
1498 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1499 base_url(&auto_addr)
1500 } else {
1501 url.clone()
1502 };
1503 let resp = authed_post(
1504 &client,
1505 format!("{spawn_url}/api/v1/sessions"),
1506 token.as_deref(),
1507 )
1508 .json(&body)
1509 .send()
1510 .await
1511 .map_err(|e| friendly_error(&e, node))?;
1512 let text = ok_or_api_error(resp).await?;
1513 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1514 let msg = format!(
1515 "Created session \"{}\" ({})",
1516 resp.session.name, resp.session.id
1517 );
1518 if !detach {
1519 let backend_id = resp
1520 .session
1521 .backend_session_id
1522 .as_deref()
1523 .unwrap_or(&resp.session.name);
1524 eprintln!("{msg}");
1525 if cmd.is_some() {
1528 let sid = resp.session.id.to_string();
1529 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1530 }
1531 attach_session(backend_id)?;
1532 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1533 }
1534 Ok(msg)
1535 }
1536 Commands::Stop { names, purge } => {
1537 let mut results = Vec::new();
1538 for name in names {
1539 let query = if *purge { "?purge=true" } else { "" };
1540 let resp = authed_post(
1541 &client,
1542 format!("{url}/api/v1/sessions/{name}/stop{query}"),
1543 token.as_deref(),
1544 )
1545 .send()
1546 .await
1547 .map_err(|e| friendly_error(&e, node))?;
1548 let action = if *purge {
1549 "stopped and purged"
1550 } else {
1551 "stopped"
1552 };
1553 match ok_or_api_error(resp).await {
1554 Ok(_) => results.push(format!("Session {name} {action}.")),
1555 Err(e) => results.push(format!("Error stopping {name}: {e}")),
1556 }
1557 }
1558 Ok(results.join("\n"))
1559 }
1560 Commands::Cleanup => {
1561 let resp = authed_post(
1562 &client,
1563 format!("{url}/api/v1/sessions/cleanup"),
1564 token.as_deref(),
1565 )
1566 .send()
1567 .await
1568 .map_err(|e| friendly_error(&e, node))?;
1569 let text = ok_or_api_error(resp).await?;
1570 let result: serde_json::Value = serde_json::from_str(&text)?;
1571 let count = result["deleted"].as_u64().unwrap_or(0);
1572 if count == 0 {
1573 Ok("No stopped or lost sessions to clean up.".into())
1574 } else {
1575 Ok(format!("Cleaned up {count} session(s)."))
1576 }
1577 }
1578 Commands::Logs {
1579 name,
1580 lines,
1581 follow,
1582 } => {
1583 if *follow {
1584 let mut stdout = std::io::stdout();
1585 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1586 .await
1587 .map_err(|e| {
1588 match e.downcast::<reqwest::Error>() {
1590 Ok(re) => friendly_error(&re, node),
1591 Err(other) => other,
1592 }
1593 })?;
1594 Ok(String::new())
1595 } else {
1596 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1597 .await
1598 .map_err(|e| match e.downcast::<reqwest::Error>() {
1599 Ok(re) => friendly_error(&re, node),
1600 Err(other) => other,
1601 })?;
1602 Ok(output)
1603 }
1604 }
1605 Commands::Interventions { name } => {
1606 let resp = authed_get(
1607 &client,
1608 format!("{url}/api/v1/sessions/{name}/interventions"),
1609 token.as_deref(),
1610 )
1611 .send()
1612 .await
1613 .map_err(|e| friendly_error(&e, node))?;
1614 let text = ok_or_api_error(resp).await?;
1615 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1616 Ok(format_interventions(&events))
1617 }
1618 Commands::Ui => {
1619 let dashboard = base_url(node);
1620 open_browser(&dashboard)?;
1621 Ok(format!("Opening {dashboard}"))
1622 }
1623 Commands::Resume { name } => {
1624 let resp = authed_post(
1625 &client,
1626 format!("{url}/api/v1/sessions/{name}/resume"),
1627 token.as_deref(),
1628 )
1629 .send()
1630 .await
1631 .map_err(|e| friendly_error(&e, node))?;
1632 let text = ok_or_api_error(resp).await?;
1633 let session: Session = serde_json::from_str(&text)?;
1634 let backend_id = session
1635 .backend_session_id
1636 .as_deref()
1637 .unwrap_or(&session.name);
1638 eprintln!("Resumed session \"{}\"", session.name);
1639 let sid = session.id.to_string();
1640 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1641 attach_session(backend_id)?;
1642 Ok(format!("Detached from session \"{}\".", session.name))
1643 }
1644 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1645 .await
1646 .map_err(|e| match e.downcast::<reqwest::Error>() {
1647 Ok(re) => friendly_error(&re, node),
1648 Err(other) => other,
1649 }),
1650 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1651 .await
1652 .map_err(|e| match e.downcast::<reqwest::Error>() {
1653 Ok(re) => friendly_error(&re, node),
1654 Err(other) => other,
1655 }),
1656 Commands::Worktree { action } => execute_worktree(&client, action, &url, token.as_deref())
1657 .await
1658 .map_err(|e| match e.downcast::<reqwest::Error>() {
1659 Ok(re) => friendly_error(&re, node),
1660 Err(other) => other,
1661 }),
1662 }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667 use super::*;
1668
1669 fn repo_session(workdir: &str, branch: Option<&str>) -> Session {
1671 Session {
1672 id: uuid::Uuid::nil(),
1673 name: "test".into(),
1674 workdir: workdir.into(),
1675 command: "echo".into(),
1676 description: None,
1677 status: pulpo_common::session::SessionStatus::Active,
1678 exit_code: None,
1679 backend_session_id: None,
1680 output_snapshot: None,
1681 metadata: None,
1682 ink: None,
1683 intervention_code: None,
1684 intervention_reason: None,
1685 intervention_at: None,
1686 last_output_at: None,
1687 idle_since: None,
1688 idle_threshold_secs: None,
1689 worktree_path: None,
1690 worktree_branch: None,
1691 git_branch: branch.map(Into::into),
1692 git_commit: None,
1693 git_files_changed: None,
1694 git_insertions: None,
1695 git_deletions: None,
1696 git_ahead: None,
1697 runtime: Runtime::Tmux,
1698 created_at: chrono::Utc::now(),
1699 updated_at: chrono::Utc::now(),
1700 }
1701 }
1702
1703 #[test]
1704 fn test_base_url() {
1705 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1706 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1707 }
1708
1709 #[test]
1710 fn test_cli_parse_list() {
1711 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1712 assert_eq!(cli.node, "localhost:7433");
1713 assert!(matches!(cli.command, Some(Commands::List { .. })));
1714 }
1715
1716 #[test]
1717 fn test_cli_parse_nodes() {
1718 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1719 assert!(matches!(cli.command, Some(Commands::Nodes)));
1720 }
1721
1722 #[test]
1723 fn test_cli_parse_ui() {
1724 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1725 assert!(matches!(cli.command, Some(Commands::Ui)));
1726 }
1727
1728 #[test]
1729 fn test_cli_parse_ui_custom_node() {
1730 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1731 assert_eq!(cli.node, "mac-mini:7433");
1733 assert_eq!(cli.path.as_deref(), Some("ui"));
1734 }
1735
1736 #[test]
1737 fn test_build_open_command() {
1738 let cmd = build_open_command("http://localhost:7433");
1739 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1740 assert_eq!(args, vec!["http://localhost:7433"]);
1741 #[cfg(target_os = "macos")]
1742 assert_eq!(cmd.get_program(), "open");
1743 #[cfg(target_os = "linux")]
1744 assert_eq!(cmd.get_program(), "xdg-open");
1745 }
1746
1747 #[test]
1748 fn test_cli_parse_spawn() {
1749 let cli = Cli::try_parse_from([
1750 "pulpo",
1751 "spawn",
1752 "my-task",
1753 "--workdir",
1754 "/tmp/repo",
1755 "--",
1756 "claude",
1757 "-p",
1758 "Fix the bug",
1759 ])
1760 .unwrap();
1761 assert!(matches!(
1762 &cli.command,
1763 Some(Commands::Spawn { name, workdir, command, .. })
1764 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1765 && command == &["claude", "-p", "Fix the bug"]
1766 ));
1767 }
1768
1769 #[test]
1770 fn test_cli_parse_spawn_with_ink() {
1771 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1772 assert!(matches!(
1773 &cli.command,
1774 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1775 ));
1776 }
1777
1778 #[test]
1779 fn test_cli_parse_spawn_with_description() {
1780 let cli =
1781 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1782 .unwrap();
1783 assert!(matches!(
1784 &cli.command,
1785 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1786 ));
1787 }
1788
1789 #[test]
1790 fn test_cli_parse_spawn_name_positional() {
1791 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1792 assert!(matches!(
1793 &cli.command,
1794 Some(Commands::Spawn { name, command, .. })
1795 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1796 ));
1797 }
1798
1799 #[test]
1800 fn test_cli_parse_spawn_no_command() {
1801 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1802 assert!(matches!(
1803 &cli.command,
1804 Some(Commands::Spawn { command, .. }) if command.is_empty()
1805 ));
1806 }
1807
1808 #[test]
1809 fn test_cli_parse_spawn_idle_threshold() {
1810 let cli =
1811 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1812 assert!(matches!(
1813 &cli.command,
1814 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1815 ));
1816 }
1817
1818 #[test]
1819 fn test_cli_parse_spawn_idle_threshold_60() {
1820 let cli =
1821 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1822 assert!(matches!(
1823 &cli.command,
1824 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1825 ));
1826 }
1827
1828 #[test]
1829 fn test_cli_parse_spawn_secrets() {
1830 let cli = Cli::try_parse_from([
1831 "pulpo",
1832 "spawn",
1833 "my-task",
1834 "--secret",
1835 "GITHUB_TOKEN",
1836 "--secret",
1837 "NPM_TOKEN",
1838 ])
1839 .unwrap();
1840 assert!(matches!(
1841 &cli.command,
1842 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1843 ));
1844 }
1845
1846 #[test]
1847 fn test_cli_parse_spawn_no_secrets() {
1848 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1849 assert!(matches!(
1850 &cli.command,
1851 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1852 ));
1853 }
1854
1855 #[test]
1856 fn test_cli_parse_secret_set_with_env() {
1857 let cli = Cli::try_parse_from([
1858 "pulpo",
1859 "secret",
1860 "set",
1861 "GH_WORK",
1862 "token123",
1863 "--env",
1864 "GITHUB_TOKEN",
1865 ])
1866 .unwrap();
1867 assert!(matches!(
1868 &cli.command,
1869 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1870 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1871 ));
1872 }
1873
1874 #[test]
1875 fn test_cli_parse_secret_set_without_env() {
1876 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1877 assert!(matches!(
1878 &cli.command,
1879 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1880 if name == "MY_KEY" && value == "val" && env.is_none()
1881 ));
1882 }
1883
1884 #[test]
1885 fn test_cli_parse_spawn_worktree() {
1886 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1887 assert!(matches!(
1888 &cli.command,
1889 Some(Commands::Spawn { worktree, .. }) if *worktree
1890 ));
1891 }
1892
1893 #[test]
1894 fn test_cli_parse_spawn_worktree_base() {
1895 let cli = Cli::try_parse_from([
1896 "pulpo",
1897 "spawn",
1898 "my-task",
1899 "--worktree",
1900 "--worktree-base",
1901 "main",
1902 ])
1903 .unwrap();
1904 assert!(matches!(
1905 &cli.command,
1906 Some(Commands::Spawn { worktree, worktree_base, .. })
1907 if *worktree && worktree_base.as_deref() == Some("main")
1908 ));
1909 }
1910
1911 #[test]
1912 fn test_cli_parse_spawn_worktree_base_implies_worktree() {
1913 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree-base", "develop"])
1915 .unwrap();
1916 assert!(matches!(
1917 &cli.command,
1918 Some(Commands::Spawn { worktree_base, .. })
1919 if worktree_base.as_deref() == Some("develop")
1920 ));
1921 }
1922
1923 #[test]
1924 fn test_cli_parse_worktree_list() {
1925 let cli = Cli::try_parse_from(["pulpo", "worktree", "list"]).unwrap();
1926 assert!(matches!(
1927 &cli.command,
1928 Some(Commands::Worktree {
1929 action: WorktreeAction::List
1930 })
1931 ));
1932 }
1933
1934 #[test]
1935 fn test_cli_parse_wt_alias() {
1936 let cli = Cli::try_parse_from(["pulpo", "wt", "list"]).unwrap();
1937 assert!(matches!(
1938 &cli.command,
1939 Some(Commands::Worktree {
1940 action: WorktreeAction::List
1941 })
1942 ));
1943 }
1944
1945 #[test]
1946 fn test_cli_parse_worktree_list_ls_alias() {
1947 let cli = Cli::try_parse_from(["pulpo", "wt", "ls"]).unwrap();
1948 assert!(matches!(
1949 &cli.command,
1950 Some(Commands::Worktree {
1951 action: WorktreeAction::List
1952 })
1953 ));
1954 }
1955
1956 #[test]
1957 fn test_format_worktree_sessions_empty() {
1958 let output = format_worktree_sessions(&[]);
1959 assert_eq!(output, "No worktree sessions.");
1960 }
1961
1962 #[test]
1963 fn test_format_worktree_sessions_with_data() {
1964 use chrono::Utc;
1965 use pulpo_common::session::SessionStatus;
1966 use uuid::Uuid;
1967
1968 let session = Session {
1969 id: Uuid::nil(),
1970 name: "fix-auth".into(),
1971 workdir: "/tmp/repo".into(),
1972 command: "claude -p 'fix auth'".into(),
1973 description: None,
1974 status: SessionStatus::Active,
1975 exit_code: None,
1976 backend_session_id: None,
1977 output_snapshot: None,
1978 metadata: None,
1979 ink: None,
1980 intervention_code: None,
1981 intervention_reason: None,
1982 intervention_at: None,
1983 last_output_at: None,
1984 idle_since: None,
1985 idle_threshold_secs: None,
1986 worktree_path: Some("/home/user/.pulpo/worktrees/fix-auth".into()),
1987 worktree_branch: Some("fix-auth".into()),
1988 git_branch: None,
1989 git_commit: None,
1990 git_files_changed: None,
1991 git_insertions: None,
1992 git_deletions: None,
1993 git_ahead: None,
1994 runtime: Runtime::Tmux,
1995 created_at: Utc::now(),
1996 updated_at: Utc::now(),
1997 };
1998 let sessions = vec![&session];
1999 let output = format_worktree_sessions(&sessions);
2000 assert!(output.contains("fix-auth"), "should show name: {output}");
2001 assert!(output.contains("active"), "should show status: {output}");
2002 assert!(
2003 output.contains("/home/user/.pulpo/worktrees/fix-auth"),
2004 "should show path: {output}"
2005 );
2006 assert!(output.contains("BRANCH"), "should have header: {output}");
2007 }
2008
2009 #[test]
2010 fn test_format_worktree_sessions_no_branch() {
2011 use chrono::Utc;
2012 use pulpo_common::session::SessionStatus;
2013 use uuid::Uuid;
2014
2015 let session = Session {
2016 id: Uuid::nil(),
2017 name: "old-session".into(),
2018 workdir: "/tmp".into(),
2019 command: "echo".into(),
2020 description: None,
2021 status: SessionStatus::Active,
2022 exit_code: None,
2023 backend_session_id: None,
2024 output_snapshot: None,
2025 metadata: None,
2026 ink: None,
2027 intervention_code: None,
2028 intervention_reason: None,
2029 intervention_at: None,
2030 last_output_at: None,
2031 idle_since: None,
2032 idle_threshold_secs: None,
2033 worktree_path: Some("/home/user/.pulpo/worktrees/old-session".into()),
2034 worktree_branch: None,
2035 git_branch: None,
2036 git_commit: None,
2037 git_files_changed: None,
2038 git_insertions: None,
2039 git_deletions: None,
2040 git_ahead: None,
2041 runtime: Runtime::Tmux,
2042 created_at: Utc::now(),
2043 updated_at: Utc::now(),
2044 };
2045 let sessions = vec![&session];
2046 let output = format_worktree_sessions(&sessions);
2047 assert!(
2048 output.contains('-'),
2049 "branch should show dash when None: {output}"
2050 );
2051 }
2052
2053 #[test]
2054 fn test_cli_parse_spawn_detach() {
2055 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
2056 assert!(matches!(
2057 &cli.command,
2058 Some(Commands::Spawn { detach, .. }) if *detach
2059 ));
2060 }
2061
2062 #[test]
2063 fn test_cli_parse_spawn_detach_short() {
2064 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
2065 assert!(matches!(
2066 &cli.command,
2067 Some(Commands::Spawn { detach, .. }) if *detach
2068 ));
2069 }
2070
2071 #[test]
2072 fn test_cli_parse_spawn_detach_default() {
2073 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
2074 assert!(matches!(
2075 &cli.command,
2076 Some(Commands::Spawn { detach, .. }) if !detach
2077 ));
2078 }
2079
2080 #[test]
2081 fn test_cli_parse_logs() {
2082 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
2083 assert!(matches!(
2084 &cli.command,
2085 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
2086 ));
2087 }
2088
2089 #[test]
2090 fn test_cli_parse_logs_with_lines() {
2091 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
2092 assert!(matches!(
2093 &cli.command,
2094 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
2095 ));
2096 }
2097
2098 #[test]
2099 fn test_cli_parse_logs_follow() {
2100 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
2101 assert!(matches!(
2102 &cli.command,
2103 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2104 ));
2105 }
2106
2107 #[test]
2108 fn test_cli_parse_logs_follow_short() {
2109 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
2110 assert!(matches!(
2111 &cli.command,
2112 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
2113 ));
2114 }
2115
2116 #[test]
2117 fn test_cli_parse_stop() {
2118 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
2119 assert!(matches!(
2120 &cli.command,
2121 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2122 ));
2123 }
2124
2125 #[test]
2126 fn test_cli_parse_stop_purge() {
2127 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
2128 assert!(matches!(
2129 &cli.command,
2130 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2131 ));
2132
2133 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
2134 assert!(matches!(
2135 &cli.command,
2136 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
2137 ));
2138 }
2139
2140 #[test]
2141 fn test_cli_parse_kill_alias() {
2142 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
2143 assert!(matches!(
2144 &cli.command,
2145 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
2146 ));
2147 }
2148
2149 #[test]
2150 fn test_cli_parse_resume() {
2151 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
2152 assert!(matches!(
2153 &cli.command,
2154 Some(Commands::Resume { name }) if name == "my-session"
2155 ));
2156 }
2157
2158 #[test]
2159 fn test_cli_parse_input() {
2160 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
2161 assert!(matches!(
2162 &cli.command,
2163 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
2164 ));
2165 }
2166
2167 #[test]
2168 fn test_cli_parse_input_no_text() {
2169 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
2170 assert!(matches!(
2171 &cli.command,
2172 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
2173 ));
2174 }
2175
2176 #[test]
2177 fn test_cli_parse_input_alias() {
2178 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
2179 assert!(matches!(
2180 &cli.command,
2181 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
2182 ));
2183 }
2184
2185 #[test]
2186 fn test_cli_parse_custom_node() {
2187 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
2188 assert_eq!(cli.node, "win-pc:8080");
2189 assert_eq!(cli.path.as_deref(), Some("list"));
2191 }
2192
2193 #[test]
2194 fn test_cli_version() {
2195 let result = Cli::try_parse_from(["pulpo", "--version"]);
2196 let err = result.unwrap_err();
2198 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
2199 }
2200
2201 #[test]
2202 fn test_cli_parse_no_subcommand_succeeds() {
2203 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
2204 assert!(cli.command.is_none());
2205 assert!(cli.path.is_none());
2206 }
2207
2208 #[test]
2209 fn test_cli_debug() {
2210 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2211 let debug = format!("{cli:?}");
2212 assert!(debug.contains("List"));
2213 }
2214
2215 #[test]
2216 fn test_commands_debug() {
2217 let cmd = Commands::List { all: false };
2218 assert_eq!(format!("{cmd:?}"), "List { all: false }");
2219 }
2220
2221 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2223
2224 fn test_create_response_json() -> String {
2226 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
2227 }
2228
2229 async fn start_test_server() -> String {
2231 use axum::http::StatusCode;
2232 use axum::{
2233 Json, Router,
2234 routing::{get, post},
2235 };
2236
2237 let create_json = test_create_response_json();
2238
2239 let app = Router::new()
2240 .route(
2241 "/api/v1/sessions",
2242 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
2243 (StatusCode::CREATED, create_json.clone())
2244 }),
2245 )
2246 .route(
2247 "/api/v1/sessions/{id}",
2248 get(|| async { TEST_SESSION_JSON.to_owned() }),
2249 )
2250 .route(
2251 "/api/v1/sessions/{id}/stop",
2252 post(|| async { StatusCode::NO_CONTENT }),
2253 )
2254 .route(
2255 "/api/v1/sessions/{id}/output",
2256 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
2257 )
2258 .route(
2259 "/api/v1/peers",
2260 get(|| async {
2261 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
2262 }),
2263 )
2264 .route(
2265 "/api/v1/sessions/{id}/resume",
2266 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
2267 )
2268 .route(
2269 "/api/v1/sessions/{id}/interventions",
2270 get(|| async { "[]".to_owned() }),
2271 )
2272 .route(
2273 "/api/v1/sessions/{id}/input",
2274 post(|| async { StatusCode::NO_CONTENT }),
2275 )
2276 .route(
2277 "/api/v1/schedules",
2278 get(|| async { Json::<Vec<()>>(vec![]) })
2279 .post(|| async { StatusCode::CREATED }),
2280 )
2281 .route(
2282 "/api/v1/schedules/{id}",
2283 axum::routing::put(|| async { StatusCode::OK })
2284 .delete(|| async { StatusCode::NO_CONTENT }),
2285 );
2286
2287 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2288 let addr = listener.local_addr().unwrap();
2289 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2290 format!("127.0.0.1:{}", addr.port())
2291 }
2292
2293 #[tokio::test]
2294 async fn test_execute_list_success() {
2295 let node = start_test_server().await;
2296 let cli = Cli {
2297 node,
2298 token: None,
2299 command: Some(Commands::List { all: false }),
2300 path: None,
2301 };
2302 let result = execute(&cli).await.unwrap();
2303 assert_eq!(result, "No sessions.");
2304 }
2305
2306 #[tokio::test]
2307 async fn test_execute_nodes_success() {
2308 let node = start_test_server().await;
2309 let cli = Cli {
2310 node,
2311 token: None,
2312 command: Some(Commands::Nodes),
2313 path: None,
2314 };
2315 let result = execute(&cli).await.unwrap();
2316 assert!(result.contains("test"));
2317 assert!(result.contains("(local)"));
2318 assert!(result.contains("NAME"));
2319 }
2320
2321 #[tokio::test]
2322 async fn test_execute_spawn_success() {
2323 let node = start_test_server().await;
2324 let cli = Cli {
2325 node,
2326 token: None,
2327 command: Some(Commands::Spawn {
2328 name: Some("test".into()),
2329 workdir: Some("/tmp/repo".into()),
2330 ink: None,
2331 description: None,
2332 detach: true,
2333 idle_threshold: None,
2334 auto: false,
2335 worktree: false,
2336 worktree_base: None,
2337 runtime: None,
2338 secret: vec![],
2339 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2340 }),
2341 path: None,
2342 };
2343 let result = execute(&cli).await.unwrap();
2344 assert!(result.contains("Created session"));
2345 assert!(result.contains("repo"));
2346 }
2347
2348 #[tokio::test]
2349 async fn test_execute_spawn_with_all_flags() {
2350 let node = start_test_server().await;
2351 let cli = Cli {
2352 node,
2353 token: None,
2354 command: Some(Commands::Spawn {
2355 name: Some("test".into()),
2356 workdir: Some("/tmp/repo".into()),
2357 ink: Some("coder".into()),
2358 description: Some("Fix the bug".into()),
2359 detach: true,
2360 idle_threshold: None,
2361 auto: false,
2362 worktree: false,
2363 worktree_base: None,
2364 runtime: None,
2365 secret: vec![],
2366 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2367 }),
2368 path: None,
2369 };
2370 let result = execute(&cli).await.unwrap();
2371 assert!(result.contains("Created session"));
2372 }
2373
2374 #[tokio::test]
2375 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
2376 let node = start_test_server().await;
2377 let cli = Cli {
2378 node,
2379 token: None,
2380 command: Some(Commands::Spawn {
2381 name: Some("full-opts".into()),
2382 workdir: Some("/tmp/repo".into()),
2383 ink: Some("coder".into()),
2384 description: Some("Full options".into()),
2385 detach: true,
2386 idle_threshold: Some(120),
2387 auto: false,
2388 worktree: true,
2389 worktree_base: None,
2390 runtime: Some("docker".into()),
2391 secret: vec![],
2392 command: vec!["claude".into()],
2393 }),
2394 path: None,
2395 };
2396 let result = execute(&cli).await.unwrap();
2397 assert!(result.contains("Created session"));
2398 }
2399
2400 #[tokio::test]
2401 async fn test_execute_spawn_no_name_derives_from_workdir() {
2402 let node = start_test_server().await;
2403 let cli = Cli {
2404 node,
2405 token: None,
2406 command: Some(Commands::Spawn {
2407 name: None,
2408 workdir: Some("/tmp/my-project".into()),
2409 ink: None,
2410 description: None,
2411 detach: true,
2412 idle_threshold: None,
2413 auto: false,
2414 worktree: false,
2415 worktree_base: None,
2416 runtime: None,
2417 secret: vec![],
2418 command: vec!["echo".into(), "hello".into()],
2419 }),
2420 path: None,
2421 };
2422 let result = execute(&cli).await.unwrap();
2423 assert!(result.contains("Created session"));
2424 }
2425
2426 #[tokio::test]
2427 async fn test_execute_spawn_no_command() {
2428 let node = start_test_server().await;
2429 let cli = Cli {
2430 node,
2431 token: None,
2432 command: Some(Commands::Spawn {
2433 name: Some("test".into()),
2434 workdir: Some("/tmp/repo".into()),
2435 ink: None,
2436 description: None,
2437 detach: true,
2438 idle_threshold: None,
2439 auto: false,
2440 worktree: false,
2441 worktree_base: None,
2442 runtime: None,
2443 secret: vec![],
2444 command: vec![],
2445 }),
2446 path: None,
2447 };
2448 let result = execute(&cli).await.unwrap();
2449 assert!(result.contains("Created session"));
2450 }
2451
2452 #[tokio::test]
2453 async fn test_execute_spawn_with_name() {
2454 let node = start_test_server().await;
2455 let cli = Cli {
2456 node,
2457 token: None,
2458 command: Some(Commands::Spawn {
2459 name: Some("my-task".into()),
2460 workdir: Some("/tmp/repo".into()),
2461 ink: None,
2462 description: None,
2463 detach: true,
2464 idle_threshold: None,
2465 auto: false,
2466 worktree: false,
2467 worktree_base: None,
2468 runtime: None,
2469 secret: vec![],
2470 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2471 }),
2472 path: None,
2473 };
2474 let result = execute(&cli).await.unwrap();
2475 assert!(result.contains("Created session"));
2476 }
2477
2478 #[tokio::test]
2479 async fn test_execute_spawn_auto_attach() {
2480 let node = start_test_server().await;
2481 let cli = Cli {
2482 node,
2483 token: None,
2484 command: Some(Commands::Spawn {
2485 name: Some("test".into()),
2486 workdir: Some("/tmp/repo".into()),
2487 ink: None,
2488 description: None,
2489 detach: false,
2490 idle_threshold: None,
2491 auto: false,
2492 worktree: false,
2493 worktree_base: None,
2494 runtime: None,
2495 secret: vec![],
2496 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2497 }),
2498 path: None,
2499 };
2500 let result = execute(&cli).await.unwrap();
2501 assert!(result.contains("Detached from session"));
2503 }
2504
2505 #[tokio::test]
2506 async fn test_execute_stop_success() {
2507 let node = start_test_server().await;
2508 let cli = Cli {
2509 node,
2510 token: None,
2511 command: Some(Commands::Stop {
2512 names: vec!["test-session".into()],
2513 purge: false,
2514 }),
2515 path: None,
2516 };
2517 let result = execute(&cli).await.unwrap();
2518 assert!(result.contains("stopped"));
2519 assert!(!result.contains("purged"));
2520 }
2521
2522 #[tokio::test]
2523 async fn test_execute_stop_with_purge() {
2524 let node = start_test_server().await;
2525 let cli = Cli {
2526 node,
2527 token: None,
2528 command: Some(Commands::Stop {
2529 names: vec!["test-session".into()],
2530 purge: true,
2531 }),
2532 path: None,
2533 };
2534 let result = execute(&cli).await.unwrap();
2535 assert!(result.contains("stopped and purged"));
2536 }
2537
2538 #[tokio::test]
2539 async fn test_execute_logs_success() {
2540 let node = start_test_server().await;
2541 let cli = Cli {
2542 node,
2543 token: None,
2544 command: Some(Commands::Logs {
2545 name: "test-session".into(),
2546 lines: 50,
2547 follow: false,
2548 }),
2549 path: None,
2550 };
2551 let result = execute(&cli).await.unwrap();
2552 assert!(result.contains("test output"));
2553 }
2554
2555 #[tokio::test]
2556 async fn test_execute_list_connection_refused() {
2557 let cli = Cli {
2558 node: "localhost:1".into(),
2559 token: None,
2560 command: Some(Commands::List { all: false }),
2561 path: None,
2562 };
2563 let result = execute(&cli).await;
2564 let err = result.unwrap_err().to_string();
2565 assert!(
2566 err.contains("Could not connect to pulpod"),
2567 "Expected friendly error, got: {err}"
2568 );
2569 assert!(err.contains("localhost:1"));
2570 }
2571
2572 #[tokio::test]
2573 async fn test_execute_nodes_connection_refused() {
2574 let cli = Cli {
2575 node: "localhost:1".into(),
2576 token: None,
2577 command: Some(Commands::Nodes),
2578 path: None,
2579 };
2580 let result = execute(&cli).await;
2581 let err = result.unwrap_err().to_string();
2582 assert!(err.contains("Could not connect to pulpod"));
2583 }
2584
2585 #[tokio::test]
2586 async fn test_execute_stop_error_response() {
2587 use axum::{Router, http::StatusCode, routing::post};
2588
2589 let app = Router::new().route(
2590 "/api/v1/sessions/{id}/stop",
2591 post(|| async {
2592 (
2593 StatusCode::NOT_FOUND,
2594 "{\"error\":\"session not found: test-session\"}",
2595 )
2596 }),
2597 );
2598 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2599 let addr = listener.local_addr().unwrap();
2600 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2601 let node = format!("127.0.0.1:{}", addr.port());
2602
2603 let cli = Cli {
2604 node,
2605 token: None,
2606 command: Some(Commands::Stop {
2607 names: vec!["test-session".into()],
2608 purge: false,
2609 }),
2610 path: None,
2611 };
2612 let result = execute(&cli).await.unwrap();
2613 assert!(result.contains("Error stopping test-session"), "{result}");
2614 }
2615
2616 #[tokio::test]
2617 async fn test_execute_logs_error_response() {
2618 use axum::{Router, http::StatusCode, routing::get};
2619
2620 let app = Router::new().route(
2621 "/api/v1/sessions/{id}/output",
2622 get(|| async {
2623 (
2624 StatusCode::NOT_FOUND,
2625 "{\"error\":\"session not found: ghost\"}",
2626 )
2627 }),
2628 );
2629 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2630 let addr = listener.local_addr().unwrap();
2631 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2632 let node = format!("127.0.0.1:{}", addr.port());
2633
2634 let cli = Cli {
2635 node,
2636 token: None,
2637 command: Some(Commands::Logs {
2638 name: "ghost".into(),
2639 lines: 50,
2640 follow: false,
2641 }),
2642 path: None,
2643 };
2644 let err = execute(&cli).await.unwrap_err();
2645 assert_eq!(err.to_string(), "session not found: ghost");
2646 }
2647
2648 #[tokio::test]
2649 async fn test_execute_resume_error_response() {
2650 use axum::{Router, http::StatusCode, routing::post};
2651
2652 let app = Router::new().route(
2653 "/api/v1/sessions/{id}/resume",
2654 post(|| async {
2655 (
2656 StatusCode::BAD_REQUEST,
2657 "{\"error\":\"session is not lost (status: active)\"}",
2658 )
2659 }),
2660 );
2661 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2662 let addr = listener.local_addr().unwrap();
2663 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2664 let node = format!("127.0.0.1:{}", addr.port());
2665
2666 let cli = Cli {
2667 node,
2668 token: None,
2669 command: Some(Commands::Resume {
2670 name: "test-session".into(),
2671 }),
2672 path: None,
2673 };
2674 let err = execute(&cli).await.unwrap_err();
2675 assert_eq!(err.to_string(), "session is not lost (status: active)");
2676 }
2677
2678 #[tokio::test]
2679 async fn test_execute_spawn_error_response() {
2680 use axum::{Router, http::StatusCode, routing::post};
2681
2682 let app = Router::new().route(
2683 "/api/v1/sessions",
2684 post(|| async {
2685 (
2686 StatusCode::INTERNAL_SERVER_ERROR,
2687 "{\"error\":\"failed to spawn session\"}",
2688 )
2689 }),
2690 );
2691 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2692 let addr = listener.local_addr().unwrap();
2693 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2694 let node = format!("127.0.0.1:{}", addr.port());
2695
2696 let cli = Cli {
2697 node,
2698 token: None,
2699 command: Some(Commands::Spawn {
2700 name: Some("test".into()),
2701 workdir: Some("/tmp/repo".into()),
2702 ink: None,
2703 description: None,
2704 detach: true,
2705 idle_threshold: None,
2706 auto: false,
2707 worktree: false,
2708 worktree_base: None,
2709 runtime: None,
2710 secret: vec![],
2711 command: vec!["test".into()],
2712 }),
2713 path: None,
2714 };
2715 let err = execute(&cli).await.unwrap_err();
2716 assert_eq!(err.to_string(), "failed to spawn session");
2717 }
2718
2719 #[tokio::test]
2720 async fn test_execute_interventions_error_response() {
2721 use axum::{Router, http::StatusCode, routing::get};
2722
2723 let app = Router::new().route(
2724 "/api/v1/sessions/{id}/interventions",
2725 get(|| async {
2726 (
2727 StatusCode::NOT_FOUND,
2728 "{\"error\":\"session not found: ghost\"}",
2729 )
2730 }),
2731 );
2732 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2733 let addr = listener.local_addr().unwrap();
2734 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2735 let node = format!("127.0.0.1:{}", addr.port());
2736
2737 let cli = Cli {
2738 node,
2739 token: None,
2740 command: Some(Commands::Interventions {
2741 name: "ghost".into(),
2742 }),
2743 path: None,
2744 };
2745 let err = execute(&cli).await.unwrap_err();
2746 assert_eq!(err.to_string(), "session not found: ghost");
2747 }
2748
2749 #[tokio::test]
2750 async fn test_execute_resume_success() {
2751 let node = start_test_server().await;
2752 let cli = Cli {
2753 node,
2754 token: None,
2755 command: Some(Commands::Resume {
2756 name: "test-session".into(),
2757 }),
2758 path: None,
2759 };
2760 let result = execute(&cli).await.unwrap();
2761 assert!(result.contains("Detached from session"));
2762 }
2763
2764 #[tokio::test]
2765 async fn test_execute_input_success() {
2766 let node = start_test_server().await;
2767 let cli = Cli {
2768 node,
2769 token: None,
2770 command: Some(Commands::Input {
2771 name: "test-session".into(),
2772 text: Some("yes".into()),
2773 }),
2774 path: None,
2775 };
2776 let result = execute(&cli).await.unwrap();
2777 assert!(result.contains("Sent input to session test-session"));
2778 }
2779
2780 #[tokio::test]
2781 async fn test_execute_input_no_text() {
2782 let node = start_test_server().await;
2783 let cli = Cli {
2784 node,
2785 token: None,
2786 command: Some(Commands::Input {
2787 name: "test-session".into(),
2788 text: None,
2789 }),
2790 path: None,
2791 };
2792 let result = execute(&cli).await.unwrap();
2793 assert!(result.contains("Sent input to session test-session"));
2794 }
2795
2796 #[tokio::test]
2797 async fn test_execute_input_connection_refused() {
2798 let cli = Cli {
2799 node: "localhost:1".into(),
2800 token: None,
2801 command: Some(Commands::Input {
2802 name: "test".into(),
2803 text: Some("y".into()),
2804 }),
2805 path: None,
2806 };
2807 let result = execute(&cli).await;
2808 let err = result.unwrap_err().to_string();
2809 assert!(err.contains("Could not connect to pulpod"));
2810 }
2811
2812 #[tokio::test]
2813 async fn test_execute_input_error_response() {
2814 use axum::{Router, http::StatusCode, routing::post};
2815
2816 let app = Router::new().route(
2817 "/api/v1/sessions/{id}/input",
2818 post(|| async {
2819 (
2820 StatusCode::NOT_FOUND,
2821 "{\"error\":\"session not found: ghost\"}",
2822 )
2823 }),
2824 );
2825 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2826 let addr = listener.local_addr().unwrap();
2827 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2828 let node = format!("127.0.0.1:{}", addr.port());
2829
2830 let cli = Cli {
2831 node,
2832 token: None,
2833 command: Some(Commands::Input {
2834 name: "ghost".into(),
2835 text: Some("y".into()),
2836 }),
2837 path: None,
2838 };
2839 let err = execute(&cli).await.unwrap_err();
2840 assert_eq!(err.to_string(), "session not found: ghost");
2841 }
2842
2843 #[tokio::test]
2844 async fn test_execute_ui() {
2845 let cli = Cli {
2846 node: "localhost:7433".into(),
2847 token: None,
2848 command: Some(Commands::Ui),
2849 path: None,
2850 };
2851 let result = execute(&cli).await.unwrap();
2852 assert!(result.contains("Opening"));
2853 assert!(result.contains("http://localhost:7433"));
2854 }
2855
2856 #[tokio::test]
2857 async fn test_execute_ui_custom_node() {
2858 let cli = Cli {
2859 node: "mac-mini:7433".into(),
2860 token: None,
2861 command: Some(Commands::Ui),
2862 path: None,
2863 };
2864 let result = execute(&cli).await.unwrap();
2865 assert!(result.contains("http://mac-mini:7433"));
2866 }
2867
2868 #[test]
2869 fn test_format_sessions_empty() {
2870 assert_eq!(format_sessions(&[]), "No sessions.");
2871 }
2872
2873 #[test]
2874 fn test_format_sessions_with_data() {
2875 use chrono::Utc;
2876 use pulpo_common::session::SessionStatus;
2877 use uuid::Uuid;
2878
2879 let sessions = vec![Session {
2880 id: Uuid::nil(),
2881 name: "my-api".into(),
2882 workdir: "/tmp/repo".into(),
2883 command: "claude -p 'Fix the bug'".into(),
2884 description: Some("Fix the bug".into()),
2885 status: SessionStatus::Active,
2886 exit_code: None,
2887 backend_session_id: None,
2888 output_snapshot: None,
2889 metadata: None,
2890 ink: None,
2891 intervention_code: None,
2892 intervention_reason: None,
2893 intervention_at: None,
2894 last_output_at: None,
2895 idle_since: None,
2896 idle_threshold_secs: None,
2897 worktree_path: None,
2898 worktree_branch: None,
2899 git_branch: None,
2900 git_commit: None,
2901 git_files_changed: None,
2902 git_insertions: None,
2903 git_deletions: None,
2904 git_ahead: None,
2905 runtime: Runtime::Tmux,
2906 created_at: Utc::now(),
2907 updated_at: Utc::now(),
2908 }];
2909 let output = format_sessions(&sessions);
2910 assert!(output.contains("ID"));
2911 assert!(output.contains("NAME"));
2912 assert!(output.contains("REPO"));
2913 assert!(output.contains("COMMAND"));
2914 assert!(output.contains("00000000"));
2915 assert!(output.contains("my-api"));
2916 assert!(output.contains("active"));
2917 assert!(output.contains("claude -p 'Fix the bug'"));
2918 }
2919
2920 #[test]
2921 fn test_format_repo_without_branch() {
2922 let s = repo_session("/home/user/test", None);
2923 assert_eq!(format_repo(&s), "test");
2924 }
2925
2926 #[test]
2927 fn test_format_repo_with_branch() {
2928 let s = repo_session("/home/user/pulpo", Some("main"));
2929 assert_eq!(format_repo(&s), "pulpo@main");
2930 }
2931
2932 #[test]
2933 fn test_format_repo_truncates_long() {
2934 let s = repo_session(
2935 "/home/user/my-very-long-repo",
2936 Some("feature/my-long-branch"),
2937 );
2938 let result = format_repo(&s);
2939 assert!(result.len() <= 30);
2940 assert!(result.ends_with("..."));
2941 }
2942
2943 #[test]
2944 fn test_format_repo_root_path() {
2945 let s = repo_session("/", None);
2946 assert_eq!(format_repo(&s), "");
2947 }
2948
2949 #[test]
2950 fn test_format_repo_with_diff_stats() {
2951 let mut s = repo_session("/home/user/pulpo", Some("main"));
2952 s.git_insertions = Some(42);
2953 s.git_deletions = Some(7);
2954 let result = format_repo(&s);
2955 assert!(result.contains("+42/-7"));
2956 }
2957
2958 #[test]
2959 fn test_format_repo_with_ahead() {
2960 let mut s = repo_session("/home/user/pulpo", Some("main"));
2961 s.git_ahead = Some(3);
2962 let result = format_repo(&s);
2963 assert!(result.contains("\u{2191}3"));
2964 }
2965
2966 #[test]
2967 fn test_format_repo_zero_diff_hidden() {
2968 let mut s = repo_session("/home/user/pulpo", None);
2969 s.git_insertions = Some(0);
2970 s.git_deletions = Some(0);
2971 let result = format_repo(&s);
2972 assert!(!result.contains("+0/-0"));
2973 }
2974
2975 #[test]
2976 fn test_format_repo_zero_ahead_hidden() {
2977 let mut s = repo_session("/home/user/pulpo", None);
2978 s.git_ahead = Some(0);
2979 let result = format_repo(&s);
2980 assert!(!result.contains('\u{2191}'));
2981 }
2982
2983 #[test]
2984 fn test_format_sessions_with_git_branch() {
2985 use chrono::Utc;
2986 use pulpo_common::session::SessionStatus;
2987 use uuid::Uuid;
2988
2989 let sessions = vec![Session {
2990 id: Uuid::nil(),
2991 name: "my-api".into(),
2992 workdir: "/tmp/repo".into(),
2993 command: "echo hello".into(),
2994 description: None,
2995 status: SessionStatus::Active,
2996 exit_code: None,
2997 backend_session_id: None,
2998 output_snapshot: None,
2999 metadata: None,
3000 ink: None,
3001 intervention_code: None,
3002 intervention_reason: None,
3003 intervention_at: None,
3004 last_output_at: None,
3005 idle_since: None,
3006 idle_threshold_secs: None,
3007 worktree_path: None,
3008 worktree_branch: None,
3009 git_branch: Some("main".into()),
3010 git_commit: Some("abc1234".into()),
3011 git_files_changed: None,
3012 git_insertions: None,
3013 git_deletions: None,
3014 git_ahead: None,
3015 runtime: Runtime::Tmux,
3016 created_at: Utc::now(),
3017 updated_at: Utc::now(),
3018 }];
3019 let output = format_sessions(&sessions);
3020 assert!(output.contains("repo@main"));
3021 }
3022
3023 #[test]
3024 fn test_format_sessions_with_error_status() {
3025 use chrono::Utc;
3026 use pulpo_common::session::SessionStatus;
3027 use uuid::Uuid;
3028
3029 let mut meta = std::collections::HashMap::new();
3030 meta.insert("error_status".into(), "Compile error".into());
3031 let sessions = vec![Session {
3032 id: Uuid::nil(),
3033 name: "my-api".into(),
3034 workdir: "/tmp/repo".into(),
3035 command: "echo hello".into(),
3036 description: None,
3037 status: SessionStatus::Active,
3038 exit_code: None,
3039 backend_session_id: None,
3040 output_snapshot: None,
3041 metadata: Some(meta),
3042 ink: None,
3043 intervention_code: None,
3044 intervention_reason: None,
3045 intervention_at: None,
3046 last_output_at: None,
3047 idle_since: None,
3048 idle_threshold_secs: None,
3049 worktree_path: None,
3050 worktree_branch: None,
3051 git_branch: None,
3052 git_commit: None,
3053 git_files_changed: None,
3054 git_insertions: None,
3055 git_deletions: None,
3056 git_ahead: None,
3057 runtime: Runtime::Tmux,
3058 created_at: Utc::now(),
3059 updated_at: Utc::now(),
3060 }];
3061 let output = format_sessions(&sessions);
3062 assert!(output.contains("[!]"));
3063 }
3064
3065 #[test]
3066 fn test_format_sessions_docker_runtime() {
3067 use chrono::Utc;
3068 use pulpo_common::session::SessionStatus;
3069 use uuid::Uuid;
3070
3071 let sessions = vec![Session {
3072 id: Uuid::nil(),
3073 name: "sandbox-test".into(),
3074 workdir: "/tmp".into(),
3075 command: "claude".into(),
3076 description: None,
3077 status: SessionStatus::Active,
3078 exit_code: None,
3079 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
3080 output_snapshot: None,
3081 metadata: None,
3082 ink: None,
3083 intervention_code: None,
3084 intervention_reason: None,
3085 intervention_at: None,
3086 last_output_at: None,
3087 idle_since: None,
3088 idle_threshold_secs: None,
3089 worktree_path: None,
3090 worktree_branch: None,
3091 git_branch: None,
3092 git_commit: None,
3093 git_files_changed: None,
3094 git_insertions: None,
3095 git_deletions: None,
3096 git_ahead: None,
3097 runtime: Runtime::Docker,
3098 created_at: Utc::now(),
3099 updated_at: Utc::now(),
3100 }];
3101 let output = format_sessions(&sessions);
3102 assert!(output.contains("tmp"));
3103 }
3104
3105 #[test]
3106 fn test_format_sessions_long_command_truncated() {
3107 use chrono::Utc;
3108 use pulpo_common::session::SessionStatus;
3109 use uuid::Uuid;
3110
3111 let sessions = vec![Session {
3112 id: Uuid::nil(),
3113 name: "test".into(),
3114 workdir: "/tmp".into(),
3115 command:
3116 "claude -p 'A very long command that exceeds fifty characters in total length here'"
3117 .into(),
3118 description: None,
3119 status: SessionStatus::Ready,
3120 exit_code: None,
3121 backend_session_id: None,
3122 output_snapshot: None,
3123 metadata: None,
3124 ink: None,
3125 intervention_code: None,
3126 intervention_reason: None,
3127 intervention_at: None,
3128 last_output_at: None,
3129 idle_since: None,
3130 idle_threshold_secs: None,
3131 worktree_path: None,
3132 worktree_branch: None,
3133 git_branch: None,
3134 git_commit: None,
3135 git_files_changed: None,
3136 git_insertions: None,
3137 git_deletions: None,
3138 git_ahead: None,
3139 runtime: Runtime::Tmux,
3140 created_at: Utc::now(),
3141 updated_at: Utc::now(),
3142 }];
3143 let output = format_sessions(&sessions);
3144 assert!(output.contains("..."));
3145 }
3146
3147 #[test]
3148 fn test_format_sessions_worktree_indicator() {
3149 use chrono::Utc;
3150 use pulpo_common::session::SessionStatus;
3151 use uuid::Uuid;
3152
3153 let sessions = vec![Session {
3154 id: Uuid::nil(),
3155 name: "wt-task".into(),
3156 workdir: "/repo".into(),
3157 command: "claude".into(),
3158 description: None,
3159 status: SessionStatus::Active,
3160 exit_code: None,
3161 backend_session_id: None,
3162 output_snapshot: None,
3163 metadata: None,
3164 ink: None,
3165 intervention_code: None,
3166 intervention_reason: None,
3167 intervention_at: None,
3168 last_output_at: None,
3169 idle_since: None,
3170 idle_threshold_secs: None,
3171 worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
3172 worktree_branch: Some("wt-task".into()),
3173 git_branch: None,
3174 git_commit: None,
3175 git_files_changed: None,
3176 git_insertions: None,
3177 git_deletions: None,
3178 git_ahead: None,
3179 runtime: Runtime::Tmux,
3180 created_at: Utc::now(),
3181 updated_at: Utc::now(),
3182 }];
3183 let output = format_sessions(&sessions);
3184 assert!(
3185 output.contains("[wt]"),
3186 "should show worktree indicator: {output}"
3187 );
3188 assert!(output.contains("wt-task [wt]"));
3189 }
3190
3191 #[test]
3192 fn test_format_sessions_pr_indicator() {
3193 use chrono::Utc;
3194 use pulpo_common::session::SessionStatus;
3195 use std::collections::HashMap;
3196 use uuid::Uuid;
3197
3198 let mut meta = HashMap::new();
3199 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3200 let sessions = vec![Session {
3201 id: Uuid::nil(),
3202 name: "pr-task".into(),
3203 workdir: "/tmp".into(),
3204 command: "claude".into(),
3205 description: None,
3206 status: SessionStatus::Active,
3207 exit_code: None,
3208 backend_session_id: None,
3209 output_snapshot: None,
3210 metadata: Some(meta),
3211 ink: None,
3212 intervention_code: None,
3213 intervention_reason: None,
3214 intervention_at: None,
3215 last_output_at: None,
3216 idle_since: None,
3217 idle_threshold_secs: None,
3218 worktree_path: None,
3219 worktree_branch: None,
3220 git_branch: None,
3221 git_commit: None,
3222 git_files_changed: None,
3223 git_insertions: None,
3224 git_deletions: None,
3225 git_ahead: None,
3226 runtime: Runtime::Tmux,
3227 created_at: Utc::now(),
3228 updated_at: Utc::now(),
3229 }];
3230 let output = format_sessions(&sessions);
3231 assert!(
3232 output.contains("[PR]"),
3233 "should show PR indicator: {output}"
3234 );
3235 assert!(output.contains("pr-task [PR]"));
3236 }
3237
3238 #[test]
3239 fn test_format_sessions_worktree_and_pr_indicator() {
3240 use chrono::Utc;
3241 use pulpo_common::session::SessionStatus;
3242 use std::collections::HashMap;
3243 use uuid::Uuid;
3244
3245 let mut meta = HashMap::new();
3246 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
3247 let sessions = vec![Session {
3248 id: Uuid::nil(),
3249 name: "both-task".into(),
3250 workdir: "/tmp".into(),
3251 command: "claude".into(),
3252 description: None,
3253 status: SessionStatus::Active,
3254 exit_code: None,
3255 backend_session_id: None,
3256 output_snapshot: None,
3257 metadata: Some(meta),
3258 ink: None,
3259 intervention_code: None,
3260 intervention_reason: None,
3261 intervention_at: None,
3262 last_output_at: None,
3263 idle_since: None,
3264 idle_threshold_secs: None,
3265 worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
3266 worktree_branch: Some("both-task".into()),
3267 git_branch: None,
3268 git_commit: None,
3269 git_files_changed: None,
3270 git_insertions: None,
3271 git_deletions: None,
3272 git_ahead: None,
3273 runtime: Runtime::Tmux,
3274 created_at: Utc::now(),
3275 updated_at: Utc::now(),
3276 }];
3277 let output = format_sessions(&sessions);
3278 assert!(
3279 output.contains("[wt] [PR]"),
3280 "should show both indicators: {output}"
3281 );
3282 }
3283
3284 #[test]
3285 fn test_format_sessions_no_pr_without_metadata() {
3286 use chrono::Utc;
3287 use pulpo_common::session::SessionStatus;
3288 use uuid::Uuid;
3289
3290 let sessions = vec![Session {
3291 id: Uuid::nil(),
3292 name: "no-pr".into(),
3293 workdir: "/tmp".into(),
3294 command: "claude".into(),
3295 description: None,
3296 status: SessionStatus::Active,
3297 exit_code: None,
3298 backend_session_id: None,
3299 output_snapshot: None,
3300 metadata: None,
3301 ink: None,
3302 intervention_code: None,
3303 intervention_reason: None,
3304 intervention_at: None,
3305 last_output_at: None,
3306 idle_since: None,
3307 idle_threshold_secs: None,
3308 worktree_path: None,
3309 worktree_branch: None,
3310 git_branch: None,
3311 git_commit: None,
3312 git_files_changed: None,
3313 git_insertions: None,
3314 git_deletions: None,
3315 git_ahead: None,
3316 runtime: Runtime::Tmux,
3317 created_at: Utc::now(),
3318 updated_at: Utc::now(),
3319 }];
3320 let output = format_sessions(&sessions);
3321 assert!(
3322 !output.contains("[PR]"),
3323 "should not show PR indicator: {output}"
3324 );
3325 }
3326
3327 #[test]
3328 fn test_format_nodes() {
3329 use pulpo_common::node::NodeInfo;
3330 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3331
3332 let resp = PeersResponse {
3333 local: NodeInfo {
3334 name: "mac-mini".into(),
3335 hostname: "h".into(),
3336 os: "macos".into(),
3337 arch: "arm64".into(),
3338 cpus: 8,
3339 memory_mb: 16384,
3340 gpu: None,
3341 },
3342 peers: vec![PeerInfo {
3343 name: "win-pc".into(),
3344 address: "win-pc:7433".into(),
3345 status: PeerStatus::Online,
3346 node_info: None,
3347 session_count: Some(3),
3348 source: PeerSource::Configured,
3349 }],
3350 };
3351 let output = format_nodes(&resp);
3352 assert!(output.contains("mac-mini"));
3353 assert!(output.contains("(local)"));
3354 assert!(output.contains("win-pc"));
3355 assert!(output.contains('3'));
3356 }
3357
3358 #[test]
3359 fn test_format_nodes_no_session_count() {
3360 use pulpo_common::node::NodeInfo;
3361 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
3362
3363 let resp = PeersResponse {
3364 local: NodeInfo {
3365 name: "local".into(),
3366 hostname: "h".into(),
3367 os: "linux".into(),
3368 arch: "x86_64".into(),
3369 cpus: 4,
3370 memory_mb: 8192,
3371 gpu: None,
3372 },
3373 peers: vec![PeerInfo {
3374 name: "peer".into(),
3375 address: "peer:7433".into(),
3376 status: PeerStatus::Offline,
3377 node_info: None,
3378 session_count: None,
3379 source: PeerSource::Configured,
3380 }],
3381 };
3382 let output = format_nodes(&resp);
3383 assert!(output.contains("offline"));
3384 let lines: Vec<&str> = output.lines().collect();
3386 assert!(lines[2].contains('-'));
3387 }
3388
3389 #[tokio::test]
3390 async fn test_execute_resume_connection_refused() {
3391 let cli = Cli {
3392 node: "localhost:1".into(),
3393 token: None,
3394 command: Some(Commands::Resume {
3395 name: "test".into(),
3396 }),
3397 path: None,
3398 };
3399 let result = execute(&cli).await;
3400 let err = result.unwrap_err().to_string();
3401 assert!(err.contains("Could not connect to pulpod"));
3402 }
3403
3404 #[tokio::test]
3405 async fn test_execute_spawn_connection_refused() {
3406 let cli = Cli {
3407 node: "localhost:1".into(),
3408 token: None,
3409 command: Some(Commands::Spawn {
3410 name: Some("test".into()),
3411 workdir: Some("/tmp".into()),
3412 ink: None,
3413 description: None,
3414 detach: true,
3415 idle_threshold: None,
3416 auto: false,
3417 worktree: false,
3418 worktree_base: None,
3419 runtime: None,
3420 secret: vec![],
3421 command: vec!["test".into()],
3422 }),
3423 path: None,
3424 };
3425 let result = execute(&cli).await;
3426 let err = result.unwrap_err().to_string();
3427 assert!(err.contains("Could not connect to pulpod"));
3428 }
3429
3430 #[tokio::test]
3431 async fn test_execute_stop_connection_refused() {
3432 let cli = Cli {
3433 node: "localhost:1".into(),
3434 token: None,
3435 command: Some(Commands::Stop {
3436 names: vec!["test".into()],
3437 purge: false,
3438 }),
3439 path: None,
3440 };
3441 let result = execute(&cli).await;
3442 let err = result.unwrap_err().to_string();
3443 assert!(err.contains("Could not connect to pulpod"));
3444 }
3445
3446 #[tokio::test]
3447 async fn test_execute_logs_connection_refused() {
3448 let cli = Cli {
3449 node: "localhost:1".into(),
3450 token: None,
3451 command: Some(Commands::Logs {
3452 name: "test".into(),
3453 lines: 50,
3454 follow: false,
3455 }),
3456 path: None,
3457 };
3458 let result = execute(&cli).await;
3459 let err = result.unwrap_err().to_string();
3460 assert!(err.contains("Could not connect to pulpod"));
3461 }
3462
3463 #[tokio::test]
3464 async fn test_friendly_error_connect() {
3465 let err = reqwest::Client::new()
3467 .get("http://127.0.0.1:1")
3468 .send()
3469 .await
3470 .unwrap_err();
3471 let friendly = friendly_error(&err, "test-node:1");
3472 let msg = friendly.to_string();
3473 assert!(
3474 msg.contains("Could not connect"),
3475 "Expected connect message, got: {msg}"
3476 );
3477 }
3478
3479 #[tokio::test]
3480 async fn test_friendly_error_other() {
3481 let err = reqwest::Client::new()
3483 .get("http://[::invalid::url")
3484 .send()
3485 .await
3486 .unwrap_err();
3487 let friendly = friendly_error(&err, "bad-host");
3488 let msg = friendly.to_string();
3489 assert!(
3490 msg.contains("Network error"),
3491 "Expected network error message, got: {msg}"
3492 );
3493 assert!(msg.contains("bad-host"));
3494 }
3495
3496 #[test]
3499 fn test_is_localhost_variants() {
3500 assert!(is_localhost("localhost:7433"));
3501 assert!(is_localhost("127.0.0.1:7433"));
3502 assert!(is_localhost("[::1]:7433"));
3503 assert!(is_localhost("::1"));
3504 assert!(is_localhost("localhost"));
3505 assert!(!is_localhost("mac-mini:7433"));
3506 assert!(!is_localhost("192.168.1.100:7433"));
3507 }
3508
3509 #[test]
3510 fn test_authed_get_with_token() {
3511 let client = reqwest::Client::new();
3512 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
3513 .build()
3514 .unwrap();
3515 let auth = req
3516 .headers()
3517 .get("authorization")
3518 .unwrap()
3519 .to_str()
3520 .unwrap();
3521 assert_eq!(auth, "Bearer tok");
3522 }
3523
3524 #[test]
3525 fn test_authed_get_without_token() {
3526 let client = reqwest::Client::new();
3527 let req = authed_get(&client, "http://h:1/api".into(), None)
3528 .build()
3529 .unwrap();
3530 assert!(req.headers().get("authorization").is_none());
3531 }
3532
3533 #[test]
3534 fn test_authed_post_with_token() {
3535 let client = reqwest::Client::new();
3536 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
3537 .build()
3538 .unwrap();
3539 let auth = req
3540 .headers()
3541 .get("authorization")
3542 .unwrap()
3543 .to_str()
3544 .unwrap();
3545 assert_eq!(auth, "Bearer secret");
3546 }
3547
3548 #[test]
3549 fn test_authed_post_without_token() {
3550 let client = reqwest::Client::new();
3551 let req = authed_post(&client, "http://h:1/api".into(), None)
3552 .build()
3553 .unwrap();
3554 assert!(req.headers().get("authorization").is_none());
3555 }
3556
3557 #[test]
3558 fn test_authed_delete_with_token() {
3559 let client = reqwest::Client::new();
3560 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3561 .build()
3562 .unwrap();
3563 let auth = req
3564 .headers()
3565 .get("authorization")
3566 .unwrap()
3567 .to_str()
3568 .unwrap();
3569 assert_eq!(auth, "Bearer del-tok");
3570 }
3571
3572 #[test]
3573 fn test_authed_delete_without_token() {
3574 let client = reqwest::Client::new();
3575 let req = authed_delete(&client, "http://h:1/api".into(), None)
3576 .build()
3577 .unwrap();
3578 assert!(req.headers().get("authorization").is_none());
3579 }
3580
3581 #[tokio::test]
3582 async fn test_resolve_token_explicit() {
3583 let client = reqwest::Client::new();
3584 let token =
3585 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3586 assert_eq!(token, Some("my-tok".into()));
3587 }
3588
3589 #[tokio::test]
3590 async fn test_resolve_token_remote_no_explicit() {
3591 let client = reqwest::Client::new();
3592 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3593 assert_eq!(token, None);
3594 }
3595
3596 #[tokio::test]
3597 async fn test_resolve_token_localhost_auto_discover() {
3598 use axum::{Json, Router, routing::get};
3599
3600 let app = Router::new().route(
3601 "/api/v1/auth/token",
3602 get(|| async {
3603 Json(AuthTokenResponse {
3604 token: "discovered".into(),
3605 })
3606 }),
3607 );
3608 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3609 let addr = listener.local_addr().unwrap();
3610 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3611
3612 let node = format!("localhost:{}", addr.port());
3613 let base = base_url(&node);
3614 let client = reqwest::Client::new();
3615 let token = resolve_token(&client, &base, &node, None).await;
3616 assert_eq!(token, Some("discovered".into()));
3617 }
3618
3619 #[tokio::test]
3620 async fn test_discover_token_empty_returns_none() {
3621 use axum::{Json, Router, routing::get};
3622
3623 let app = Router::new().route(
3624 "/api/v1/auth/token",
3625 get(|| async {
3626 Json(AuthTokenResponse {
3627 token: String::new(),
3628 })
3629 }),
3630 );
3631 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3632 let addr = listener.local_addr().unwrap();
3633 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3634
3635 let base = format!("http://127.0.0.1:{}", addr.port());
3636 let client = reqwest::Client::new();
3637 assert_eq!(discover_token(&client, &base).await, None);
3638 }
3639
3640 #[tokio::test]
3641 async fn test_discover_token_unreachable_returns_none() {
3642 let client = reqwest::Client::new();
3643 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3644 }
3645
3646 #[test]
3647 fn test_cli_parse_with_token() {
3648 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3649 assert_eq!(cli.token, Some("my-secret".into()));
3650 }
3651
3652 #[test]
3653 fn test_cli_parse_without_token() {
3654 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3655 assert_eq!(cli.token, None);
3656 }
3657
3658 #[tokio::test]
3659 async fn test_execute_with_explicit_token_sends_header() {
3660 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3661
3662 let app = Router::new().route(
3663 "/api/v1/sessions",
3664 get(|req: Request| async move {
3665 let auth = req
3666 .headers()
3667 .get("authorization")
3668 .and_then(|v| v.to_str().ok())
3669 .unwrap_or("");
3670 assert_eq!(auth, "Bearer test-token");
3671 (StatusCode::OK, "[]".to_owned())
3672 }),
3673 );
3674 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3675 let addr = listener.local_addr().unwrap();
3676 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3677 let node = format!("127.0.0.1:{}", addr.port());
3678
3679 let cli = Cli {
3680 node,
3681 token: Some("test-token".into()),
3682 command: Some(Commands::List { all: false }),
3683 path: None,
3684 };
3685 let result = execute(&cli).await.unwrap();
3686 assert_eq!(result, "No sessions.");
3687 }
3688
3689 #[test]
3692 fn test_cli_parse_interventions() {
3693 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3694 assert!(matches!(
3695 &cli.command,
3696 Some(Commands::Interventions { name }) if name == "my-session"
3697 ));
3698 }
3699
3700 #[test]
3701 fn test_format_interventions_empty() {
3702 assert_eq!(format_interventions(&[]), "No intervention events.");
3703 }
3704
3705 #[test]
3706 fn test_format_interventions_with_data() {
3707 let events = vec![
3708 InterventionEventResponse {
3709 id: 1,
3710 session_id: "sess-1".into(),
3711 code: None,
3712 reason: "Memory exceeded threshold".into(),
3713 created_at: "2026-01-01T00:00:00Z".into(),
3714 },
3715 InterventionEventResponse {
3716 id: 2,
3717 session_id: "sess-1".into(),
3718 code: None,
3719 reason: "Idle for 10 minutes".into(),
3720 created_at: "2026-01-02T00:00:00Z".into(),
3721 },
3722 ];
3723 let output = format_interventions(&events);
3724 assert!(output.contains("ID"));
3725 assert!(output.contains("TIMESTAMP"));
3726 assert!(output.contains("REASON"));
3727 assert!(output.contains("Memory exceeded threshold"));
3728 assert!(output.contains("Idle for 10 minutes"));
3729 assert!(output.contains("2026-01-01T00:00:00Z"));
3730 }
3731
3732 #[tokio::test]
3733 async fn test_execute_interventions_empty() {
3734 let node = start_test_server().await;
3735 let cli = Cli {
3736 node,
3737 token: None,
3738 command: Some(Commands::Interventions {
3739 name: "my-session".into(),
3740 }),
3741 path: None,
3742 };
3743 let result = execute(&cli).await.unwrap();
3744 assert_eq!(result, "No intervention events.");
3745 }
3746
3747 #[tokio::test]
3748 async fn test_execute_interventions_with_data() {
3749 use axum::{Router, routing::get};
3750
3751 let app = Router::new().route(
3752 "/api/v1/sessions/{id}/interventions",
3753 get(|| async {
3754 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3755 .to_owned()
3756 }),
3757 );
3758 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3759 let addr = listener.local_addr().unwrap();
3760 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3761 let node = format!("127.0.0.1:{}", addr.port());
3762
3763 let cli = Cli {
3764 node,
3765 token: None,
3766 command: Some(Commands::Interventions {
3767 name: "test".into(),
3768 }),
3769 path: None,
3770 };
3771 let result = execute(&cli).await.unwrap();
3772 assert!(result.contains("OOM"));
3773 assert!(result.contains("2026-01-01T00:00:00Z"));
3774 }
3775
3776 #[tokio::test]
3777 async fn test_execute_interventions_connection_refused() {
3778 let cli = Cli {
3779 node: "localhost:1".into(),
3780 token: None,
3781 command: Some(Commands::Interventions {
3782 name: "test".into(),
3783 }),
3784 path: None,
3785 };
3786 let result = execute(&cli).await;
3787 let err = result.unwrap_err().to_string();
3788 assert!(err.contains("Could not connect to pulpod"));
3789 }
3790
3791 #[test]
3794 fn test_build_attach_command_tmux() {
3795 let cmd = build_attach_command("my-session");
3796 assert_eq!(cmd.get_program(), "tmux");
3797 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3798 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3799 }
3800
3801 #[test]
3802 fn test_build_attach_command_docker() {
3803 let cmd = build_attach_command("docker:pulpo-my-task");
3804 assert_eq!(cmd.get_program(), "docker");
3805 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3806 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3807 }
3808
3809 #[test]
3810 fn test_cli_parse_attach() {
3811 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3812 assert!(matches!(
3813 &cli.command,
3814 Some(Commands::Attach { name }) if name == "my-session"
3815 ));
3816 }
3817
3818 #[test]
3819 fn test_cli_parse_attach_alias() {
3820 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3821 assert!(matches!(
3822 &cli.command,
3823 Some(Commands::Attach { name }) if name == "my-session"
3824 ));
3825 }
3826
3827 #[tokio::test]
3828 async fn test_execute_attach_success() {
3829 let node = start_test_server().await;
3830 let cli = Cli {
3831 node,
3832 token: None,
3833 command: Some(Commands::Attach {
3834 name: "test-session".into(),
3835 }),
3836 path: None,
3837 };
3838 let result = execute(&cli).await.unwrap();
3839 assert!(result.contains("Detached from session test-session"));
3840 }
3841
3842 #[tokio::test]
3843 async fn test_execute_attach_with_backend_session_id() {
3844 use axum::{Router, routing::get};
3845 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3846 let app = Router::new().route(
3847 "/api/v1/sessions/{id}",
3848 get(move || async move { session_json.to_owned() }),
3849 );
3850 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3851 let addr = listener.local_addr().unwrap();
3852 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3853
3854 let cli = Cli {
3855 node: format!("127.0.0.1:{}", addr.port()),
3856 token: None,
3857 command: Some(Commands::Attach {
3858 name: "my-session".into(),
3859 }),
3860 path: None,
3861 };
3862 let result = execute(&cli).await.unwrap();
3863 assert!(result.contains("Detached from session my-session"));
3864 }
3865
3866 #[tokio::test]
3867 async fn test_execute_attach_connection_refused() {
3868 let cli = Cli {
3869 node: "localhost:1".into(),
3870 token: None,
3871 command: Some(Commands::Attach {
3872 name: "test-session".into(),
3873 }),
3874 path: None,
3875 };
3876 let result = execute(&cli).await;
3877 let err = result.unwrap_err().to_string();
3878 assert!(err.contains("Could not connect to pulpod"));
3879 }
3880
3881 #[tokio::test]
3882 async fn test_execute_attach_error_response() {
3883 use axum::{Router, http::StatusCode, routing::get};
3884 let app = Router::new().route(
3885 "/api/v1/sessions/{id}",
3886 get(|| async {
3887 (
3888 StatusCode::NOT_FOUND,
3889 r#"{"error":"session not found"}"#.to_owned(),
3890 )
3891 }),
3892 );
3893 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3894 let addr = listener.local_addr().unwrap();
3895 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3896
3897 let cli = Cli {
3898 node: format!("127.0.0.1:{}", addr.port()),
3899 token: None,
3900 command: Some(Commands::Attach {
3901 name: "nonexistent".into(),
3902 }),
3903 path: None,
3904 };
3905 let result = execute(&cli).await;
3906 let err = result.unwrap_err().to_string();
3907 assert!(err.contains("session not found"));
3908 }
3909
3910 #[tokio::test]
3911 async fn test_execute_attach_stale_session() {
3912 use axum::{Router, routing::get};
3913 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3914 let app = Router::new().route(
3915 "/api/v1/sessions/{id}",
3916 get(move || async move { session_json.to_owned() }),
3917 );
3918 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3919 let addr = listener.local_addr().unwrap();
3920 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3921
3922 let cli = Cli {
3923 node: format!("127.0.0.1:{}", addr.port()),
3924 token: None,
3925 command: Some(Commands::Attach {
3926 name: "stale-sess".into(),
3927 }),
3928 path: None,
3929 };
3930 let result = execute(&cli).await;
3931 let err = result.unwrap_err().to_string();
3932 assert!(err.contains("lost"));
3933 assert!(err.contains("pulpo resume"));
3934 }
3935
3936 #[tokio::test]
3937 async fn test_execute_attach_dead_session() {
3938 use axum::{Router, routing::get};
3939 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3940 let app = Router::new().route(
3941 "/api/v1/sessions/{id}",
3942 get(move || async move { session_json.to_owned() }),
3943 );
3944 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3945 let addr = listener.local_addr().unwrap();
3946 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3947
3948 let cli = Cli {
3949 node: format!("127.0.0.1:{}", addr.port()),
3950 token: None,
3951 command: Some(Commands::Attach {
3952 name: "dead-sess".into(),
3953 }),
3954 path: None,
3955 };
3956 let result = execute(&cli).await;
3957 let err = result.unwrap_err().to_string();
3958 assert!(err.contains("stopped"));
3959 assert!(err.contains("cannot attach"));
3960 }
3961
3962 #[test]
3965 fn test_cli_parse_alias_spawn() {
3966 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3967 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3968 }
3969
3970 #[test]
3971 fn test_cli_parse_alias_list() {
3972 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3973 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3974 }
3975
3976 #[test]
3977 fn test_cli_parse_list_all() {
3978 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3979 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3980
3981 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3982 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3983 }
3984
3985 #[test]
3986 fn test_cli_parse_alias_logs() {
3987 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3988 assert!(matches!(
3989 &cli.command,
3990 Some(Commands::Logs { name, .. }) if name == "my-session"
3991 ));
3992 }
3993
3994 #[test]
3995 fn test_cli_parse_alias_stop() {
3996 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3997 assert!(matches!(
3998 &cli.command,
3999 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
4000 ));
4001 }
4002
4003 #[test]
4004 fn test_cli_parse_alias_resume() {
4005 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
4006 assert!(matches!(
4007 &cli.command,
4008 Some(Commands::Resume { name }) if name == "my-session"
4009 ));
4010 }
4011
4012 #[test]
4013 fn test_cli_parse_alias_nodes() {
4014 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
4015 assert!(matches!(&cli.command, Some(Commands::Nodes)));
4016 }
4017
4018 #[test]
4019 fn test_cli_parse_alias_interventions() {
4020 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
4021 assert!(matches!(
4022 &cli.command,
4023 Some(Commands::Interventions { name }) if name == "my-session"
4024 ));
4025 }
4026
4027 #[test]
4028 fn test_api_error_json() {
4029 let err = api_error("{\"error\":\"session not found: foo\"}");
4030 assert_eq!(err.to_string(), "session not found: foo");
4031 }
4032
4033 #[test]
4034 fn test_api_error_plain_text() {
4035 let err = api_error("plain text error");
4036 assert_eq!(err.to_string(), "plain text error");
4037 }
4038
4039 #[test]
4042 fn test_diff_output_empty_prev() {
4043 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
4044 }
4045
4046 #[test]
4047 fn test_diff_output_identical() {
4048 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
4049 }
4050
4051 #[test]
4052 fn test_diff_output_new_lines_appended() {
4053 let prev = "line1\nline2";
4054 let new = "line1\nline2\nline3\nline4";
4055 assert_eq!(diff_output(prev, new), "line3\nline4");
4056 }
4057
4058 #[test]
4059 fn test_diff_output_scrolled_window() {
4060 let prev = "line1\nline2\nline3";
4062 let new = "line2\nline3\nline4";
4063 assert_eq!(diff_output(prev, new), "line4");
4064 }
4065
4066 #[test]
4067 fn test_diff_output_completely_different() {
4068 let prev = "aaa\nbbb";
4069 let new = "xxx\nyyy";
4070 assert_eq!(diff_output(prev, new), "xxx\nyyy");
4071 }
4072
4073 #[test]
4074 fn test_diff_output_last_line_matches_but_overlap_fails() {
4075 let prev = "aaa\ncommon";
4077 let new = "zzz\ncommon\nnew_line";
4078 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
4082 }
4083
4084 #[test]
4085 fn test_diff_output_new_empty() {
4086 assert_eq!(diff_output("line1", ""), "");
4087 }
4088
4089 async fn start_follow_test_server() -> String {
4094 use axum::{Router, extract::Path, extract::Query, routing::get};
4095 use std::sync::Arc;
4096 use std::sync::atomic::{AtomicUsize, Ordering};
4097
4098 let call_count = Arc::new(AtomicUsize::new(0));
4099 let output_count = call_count.clone();
4100
4101 let app = Router::new()
4102 .route(
4103 "/api/v1/sessions/{id}/output",
4104 get(
4105 move |_path: Path<String>,
4106 _query: Query<std::collections::HashMap<String, String>>| {
4107 let count = output_count.clone();
4108 async move {
4109 let n = count.fetch_add(1, Ordering::SeqCst);
4110 let output = match n {
4111 0 => "line1\nline2".to_owned(),
4112 1 => "line1\nline2\nline3".to_owned(),
4113 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
4114 };
4115 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
4116 }
4117 },
4118 ),
4119 )
4120 .route(
4121 "/api/v1/sessions/{id}",
4122 get(|_path: Path<String>| async {
4123 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4124 }),
4125 );
4126
4127 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4128 let addr = listener.local_addr().unwrap();
4129 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4130 format!("http://127.0.0.1:{}", addr.port())
4131 }
4132
4133 #[tokio::test]
4134 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
4135 let base = start_follow_test_server().await;
4136 let client = reqwest::Client::new();
4137 let mut buf = Vec::new();
4138
4139 follow_logs(&client, &base, "test", 100, None, &mut buf)
4140 .await
4141 .unwrap();
4142
4143 let output = String::from_utf8(buf).unwrap();
4144 assert!(output.contains("line1"));
4146 assert!(output.contains("line2"));
4147 assert!(output.contains("line3"));
4148 assert!(output.contains("line4"));
4149 assert!(output.contains("[pulpo] Agent exited"));
4150 }
4151
4152 #[tokio::test]
4153 async fn test_execute_logs_follow_success() {
4154 let base = start_follow_test_server().await;
4155 let node = base.strip_prefix("http://").unwrap().to_owned();
4157
4158 let cli = Cli {
4159 node,
4160 token: None,
4161 command: Some(Commands::Logs {
4162 name: "test".into(),
4163 lines: 100,
4164 follow: true,
4165 }),
4166 path: None,
4167 };
4168 let result = execute(&cli).await.unwrap();
4170 assert_eq!(result, "");
4171 }
4172
4173 #[tokio::test]
4174 async fn test_execute_logs_follow_connection_refused() {
4175 let cli = Cli {
4176 node: "localhost:1".into(),
4177 token: None,
4178 command: Some(Commands::Logs {
4179 name: "test".into(),
4180 lines: 50,
4181 follow: true,
4182 }),
4183 path: None,
4184 };
4185 let result = execute(&cli).await;
4186 let err = result.unwrap_err().to_string();
4187 assert!(
4188 err.contains("Could not connect to pulpod"),
4189 "Expected friendly error, got: {err}"
4190 );
4191 }
4192
4193 #[tokio::test]
4194 async fn test_follow_logs_exits_on_dead() {
4195 use axum::{Router, extract::Path, extract::Query, routing::get};
4196
4197 let app = Router::new()
4198 .route(
4199 "/api/v1/sessions/{id}/output",
4200 get(
4201 |_path: Path<String>,
4202 _query: Query<std::collections::HashMap<String, String>>| async {
4203 r#"{"output":"some output"}"#.to_owned()
4204 },
4205 ),
4206 )
4207 .route(
4208 "/api/v1/sessions/{id}",
4209 get(|_path: Path<String>| async {
4210 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4211 }),
4212 );
4213
4214 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4215 let addr = listener.local_addr().unwrap();
4216 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4217 let base = format!("http://127.0.0.1:{}", addr.port());
4218
4219 let client = reqwest::Client::new();
4220 let mut buf = Vec::new();
4221 follow_logs(&client, &base, "test", 100, None, &mut buf)
4222 .await
4223 .unwrap();
4224
4225 let output = String::from_utf8(buf).unwrap();
4226 assert!(output.contains("some output"));
4227 }
4228
4229 #[tokio::test]
4230 async fn test_follow_logs_exits_on_stale() {
4231 use axum::{Router, extract::Path, extract::Query, routing::get};
4232
4233 let app = Router::new()
4234 .route(
4235 "/api/v1/sessions/{id}/output",
4236 get(
4237 |_path: Path<String>,
4238 _query: Query<std::collections::HashMap<String, String>>| async {
4239 r#"{"output":"stale output"}"#.to_owned()
4240 },
4241 ),
4242 )
4243 .route(
4244 "/api/v1/sessions/{id}",
4245 get(|_path: Path<String>| async {
4246 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
4247 }),
4248 );
4249
4250 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4251 let addr = listener.local_addr().unwrap();
4252 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4253 let base = format!("http://127.0.0.1:{}", addr.port());
4254
4255 let client = reqwest::Client::new();
4256 let mut buf = Vec::new();
4257 follow_logs(&client, &base, "test", 100, None, &mut buf)
4258 .await
4259 .unwrap();
4260
4261 let output = String::from_utf8(buf).unwrap();
4262 assert!(output.contains("stale output"));
4263 }
4264
4265 #[tokio::test]
4266 async fn test_execute_logs_follow_non_reqwest_error() {
4267 use axum::{Router, extract::Path, extract::Query, routing::get};
4268
4269 let app = Router::new()
4271 .route(
4272 "/api/v1/sessions/{id}/output",
4273 get(
4274 |_path: Path<String>,
4275 _query: Query<std::collections::HashMap<String, String>>| async {
4276 r#"{"output":"initial"}"#.to_owned()
4277 },
4278 ),
4279 )
4280 .route(
4281 "/api/v1/sessions/{id}",
4282 get(|_path: Path<String>| async { "not valid json".to_owned() }),
4283 );
4284
4285 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4286 let addr = listener.local_addr().unwrap();
4287 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4288 let node = format!("127.0.0.1:{}", addr.port());
4289
4290 let cli = Cli {
4291 node,
4292 token: None,
4293 command: Some(Commands::Logs {
4294 name: "test".into(),
4295 lines: 100,
4296 follow: true,
4297 }),
4298 path: None,
4299 };
4300 let err = execute(&cli).await.unwrap_err();
4301 let msg = err.to_string();
4303 assert!(
4304 msg.contains("expected ident"),
4305 "Expected serde parse error, got: {msg}"
4306 );
4307 }
4308
4309 #[tokio::test]
4310 async fn test_fetch_session_status_connection_error() {
4311 let client = reqwest::Client::new();
4312 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
4313 assert!(result.is_err());
4314 }
4315
4316 #[test]
4319 fn test_format_schedules_empty() {
4320 assert_eq!(format_schedules(&[]), "No schedules.");
4321 }
4322
4323 #[test]
4324 fn test_format_schedules_with_entries() {
4325 let schedules = vec![serde_json::json!({
4326 "name": "nightly",
4327 "cron": "0 3 * * *",
4328 "enabled": true,
4329 "last_run_at": null,
4330 "target_node": null
4331 })];
4332 let output = format_schedules(&schedules);
4333 assert!(output.contains("nightly"));
4334 assert!(output.contains("0 3 * * *"));
4335 assert!(output.contains("local"));
4336 assert!(output.contains("yes"));
4337 assert!(output.contains('-'));
4338 }
4339
4340 #[test]
4341 fn test_format_schedules_disabled_entry() {
4342 let schedules = vec![serde_json::json!({
4343 "name": "weekly",
4344 "cron": "0 0 * * 0",
4345 "enabled": false,
4346 "last_run_at": "2026-03-18T03:00:00Z",
4347 "target_node": "gpu-box"
4348 })];
4349 let output = format_schedules(&schedules);
4350 assert!(output.contains("weekly"));
4351 assert!(output.contains("no"));
4352 assert!(output.contains("gpu-box"));
4353 assert!(output.contains("2026-03-18T03:00"));
4354 }
4355
4356 #[test]
4357 fn test_format_schedules_header() {
4358 let schedules = vec![serde_json::json!({
4359 "name": "test",
4360 "cron": "* * * * *",
4361 "enabled": true,
4362 "last_run_at": null,
4363 "target_node": null
4364 })];
4365 let output = format_schedules(&schedules);
4366 assert!(output.contains("NAME"));
4367 assert!(output.contains("CRON"));
4368 assert!(output.contains("ENABLED"));
4369 assert!(output.contains("LAST RUN"));
4370 assert!(output.contains("NODE"));
4371 }
4372
4373 #[test]
4376 fn test_cli_parse_schedule_add() {
4377 let cli = Cli::try_parse_from([
4378 "pulpo",
4379 "schedule",
4380 "add",
4381 "nightly",
4382 "0 3 * * *",
4383 "--workdir",
4384 "/repo",
4385 "--",
4386 "claude",
4387 "-p",
4388 "review",
4389 ])
4390 .unwrap();
4391 assert!(matches!(
4392 &cli.command,
4393 Some(Commands::Schedule {
4394 action: ScheduleAction::Add { name, cron, .. }
4395 }) if name == "nightly" && cron == "0 3 * * *"
4396 ));
4397 }
4398
4399 #[test]
4400 fn test_cli_parse_schedule_add_with_node() {
4401 let cli = Cli::try_parse_from([
4402 "pulpo",
4403 "schedule",
4404 "add",
4405 "nightly",
4406 "0 3 * * *",
4407 "--workdir",
4408 "/repo",
4409 "--node",
4410 "gpu-box",
4411 "--",
4412 "claude",
4413 ])
4414 .unwrap();
4415 assert!(matches!(
4416 &cli.command,
4417 Some(Commands::Schedule {
4418 action: ScheduleAction::Add { node, .. }
4419 }) if node.as_deref() == Some("gpu-box")
4420 ));
4421 }
4422
4423 #[test]
4424 fn test_cli_parse_schedule_add_install_alias() {
4425 let cli =
4426 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
4427 assert!(matches!(
4428 &cli.command,
4429 Some(Commands::Schedule {
4430 action: ScheduleAction::Add { name, .. }
4431 }) if name == "nightly"
4432 ));
4433 }
4434
4435 #[test]
4436 fn test_cli_parse_schedule_list() {
4437 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
4438 assert!(matches!(
4439 &cli.command,
4440 Some(Commands::Schedule {
4441 action: ScheduleAction::List
4442 })
4443 ));
4444 }
4445
4446 #[test]
4447 fn test_cli_parse_schedule_remove() {
4448 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
4449 assert!(matches!(
4450 &cli.command,
4451 Some(Commands::Schedule {
4452 action: ScheduleAction::Remove { name }
4453 }) if name == "nightly"
4454 ));
4455 }
4456
4457 #[test]
4458 fn test_cli_parse_schedule_pause() {
4459 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
4460 assert!(matches!(
4461 &cli.command,
4462 Some(Commands::Schedule {
4463 action: ScheduleAction::Pause { name }
4464 }) if name == "nightly"
4465 ));
4466 }
4467
4468 #[test]
4469 fn test_cli_parse_schedule_resume() {
4470 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
4471 assert!(matches!(
4472 &cli.command,
4473 Some(Commands::Schedule {
4474 action: ScheduleAction::Resume { name }
4475 }) if name == "nightly"
4476 ));
4477 }
4478
4479 #[test]
4480 fn test_cli_parse_schedule_alias() {
4481 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
4482 assert!(matches!(
4483 &cli.command,
4484 Some(Commands::Schedule {
4485 action: ScheduleAction::List
4486 })
4487 ));
4488 }
4489
4490 #[test]
4491 fn test_cli_parse_schedule_list_alias() {
4492 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
4493 assert!(matches!(
4494 &cli.command,
4495 Some(Commands::Schedule {
4496 action: ScheduleAction::List
4497 })
4498 ));
4499 }
4500
4501 #[test]
4502 fn test_cli_parse_schedule_remove_alias() {
4503 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
4504 assert!(matches!(
4505 &cli.command,
4506 Some(Commands::Schedule {
4507 action: ScheduleAction::Remove { name }
4508 }) if name == "nightly"
4509 ));
4510 }
4511
4512 #[tokio::test]
4513 async fn test_execute_schedule_list_via_execute() {
4514 let node = start_test_server().await;
4515 let cli = Cli {
4516 node,
4517 token: None,
4518 command: Some(Commands::Schedule {
4519 action: ScheduleAction::List,
4520 }),
4521 path: None,
4522 };
4523 let result = execute(&cli).await.unwrap();
4524 #[cfg(coverage)]
4526 assert!(result.is_empty());
4527 #[cfg(not(coverage))]
4528 assert_eq!(result, "No schedules.");
4529 }
4530
4531 #[test]
4532 fn test_schedule_action_debug() {
4533 let action = ScheduleAction::List;
4534 assert_eq!(format!("{action:?}"), "List");
4535 }
4536
4537 #[test]
4538 fn test_cli_parse_send_alias() {
4539 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
4540 assert!(matches!(
4541 &cli.command,
4542 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
4543 ));
4544 }
4545
4546 #[test]
4547 fn test_cli_parse_spawn_no_name() {
4548 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4549 assert!(matches!(
4550 &cli.command,
4551 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4552 ));
4553 }
4554
4555 #[test]
4556 fn test_cli_parse_spawn_optional_name_with_command() {
4557 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4558 assert!(matches!(
4559 &cli.command,
4560 Some(Commands::Spawn { name, command, .. })
4561 if name.is_none() && command == &["echo", "hello"]
4562 ));
4563 }
4564
4565 #[test]
4566 fn test_cli_parse_path_shortcut() {
4567 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4568 assert!(cli.command.is_none());
4569 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4570 }
4571
4572 #[test]
4573 fn test_cli_parse_no_args() {
4574 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4575 assert!(cli.command.is_none());
4576 assert!(cli.path.is_none());
4577 }
4578
4579 #[test]
4580 fn test_derive_session_name_simple() {
4581 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4582 }
4583
4584 #[test]
4585 fn test_derive_session_name_with_special_chars() {
4586 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4587 }
4588
4589 #[test]
4590 fn test_derive_session_name_root() {
4591 assert_eq!(derive_session_name("/"), "session");
4592 }
4593
4594 #[test]
4595 fn test_derive_session_name_dots() {
4596 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4597 }
4598
4599 #[test]
4600 fn test_resolve_path_absolute() {
4601 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4602 }
4603
4604 #[test]
4605 fn test_resolve_path_relative() {
4606 let resolved = resolve_path("my-repo");
4607 assert!(resolved.ends_with("my-repo"));
4608 assert!(resolved.starts_with('/'));
4609 }
4610
4611 #[tokio::test]
4612 async fn test_execute_no_args_shows_help() {
4613 let node = start_test_server().await;
4614 let cli = Cli {
4615 node,
4616 token: None,
4617 path: None,
4618 command: None,
4619 };
4620 let result = execute(&cli).await.unwrap();
4621 assert!(
4622 result.is_empty(),
4623 "no-args should return empty string after printing help"
4624 );
4625 }
4626
4627 #[tokio::test]
4628 async fn test_execute_path_shortcut() {
4629 let node = start_test_server().await;
4630 let cli = Cli {
4631 node,
4632 token: None,
4633 path: Some("/tmp".into()),
4634 command: None,
4635 };
4636 let result = execute(&cli).await.unwrap();
4637 assert!(result.contains("Detached from session"));
4638 }
4639
4640 #[tokio::test]
4641 async fn test_deduplicate_session_name_no_conflict() {
4642 let base = "http://127.0.0.1:1";
4644 let client = reqwest::Client::new();
4645 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4646 assert_eq!(name, "fresh");
4647 }
4648
4649 #[tokio::test]
4650 async fn test_deduplicate_session_name_with_conflict() {
4651 use axum::{Router, routing::get};
4652 use std::sync::atomic::{AtomicU32, Ordering};
4653
4654 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4655 let counter = call_count.clone();
4656 let app = Router::new()
4657 .route(
4658 "/api/v1/sessions/{id}",
4659 get(move || {
4660 let c = counter.clone();
4661 async move {
4662 let n = c.fetch_add(1, Ordering::SeqCst);
4663 if n == 0 {
4664 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4666 } else {
4667 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4669 }
4670 }
4671 }),
4672 )
4673 .route(
4674 "/api/v1/peers",
4675 get(|| async {
4676 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4677 }),
4678 );
4679 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4680 let addr = listener.local_addr().unwrap();
4681 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4682 let base = format!("http://127.0.0.1:{}", addr.port());
4683 let client = reqwest::Client::new();
4684 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4685 assert_eq!(name, "repo-2");
4686 }
4687
4688 #[test]
4691 fn test_node_needs_resolution() {
4692 assert!(!node_needs_resolution("localhost:7433"));
4693 assert!(!node_needs_resolution("mac-mini:7433"));
4694 assert!(!node_needs_resolution("10.0.0.1:7433"));
4695 assert!(!node_needs_resolution("[::1]:7433"));
4696 assert!(node_needs_resolution("mac-mini"));
4697 assert!(node_needs_resolution("linux-server"));
4698 assert!(node_needs_resolution("localhost"));
4699 }
4700
4701 #[tokio::test]
4702 async fn test_resolve_node_with_port() {
4703 let client = reqwest::Client::new();
4704 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4705 assert_eq!(addr, "mac-mini:7433");
4706 assert!(token.is_none());
4707 }
4708
4709 #[tokio::test]
4710 async fn test_resolve_node_fallback_appends_port() {
4711 let client = reqwest::Client::new();
4714 let (addr, token) = resolve_node(&client, "unknown-host").await;
4715 assert_eq!(addr, "unknown-host:7433");
4716 assert!(token.is_none());
4717 }
4718
4719 #[cfg(not(coverage))]
4720 #[tokio::test]
4721 async fn test_resolve_node_finds_peer() {
4722 use axum::{Router, routing::get};
4723
4724 let app = Router::new()
4725 .route(
4726 "/api/v1/peers",
4727 get(|| async {
4728 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4729 }),
4730 )
4731 .route(
4732 "/api/v1/config",
4733 get(|| async {
4734 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4735 }),
4736 );
4737
4738 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4740 return;
4741 };
4742 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4743
4744 let client = reqwest::Client::new();
4745 let (addr, token) = resolve_node(&client, "mac-mini").await;
4746 assert_eq!(addr, "10.0.0.5:7433");
4747 assert_eq!(token, Some("peer-secret".into()));
4748 }
4749
4750 #[cfg(not(coverage))]
4751 #[tokio::test]
4752 async fn test_resolve_node_peer_no_token() {
4753 use axum::{Router, routing::get};
4754
4755 let app = Router::new()
4756 .route(
4757 "/api/v1/peers",
4758 get(|| async {
4759 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4760 }),
4761 )
4762 .route(
4763 "/api/v1/config",
4764 get(|| async {
4765 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4766 }),
4767 );
4768
4769 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4770 return; };
4772 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4773
4774 let client = reqwest::Client::new();
4775 let (addr, token) = resolve_node(&client, "test-peer").await;
4776 assert_eq!(addr, "10.0.0.9:7433");
4777 assert!(token.is_none()); }
4779
4780 #[tokio::test]
4781 async fn test_execute_with_peer_name_resolution() {
4782 let cli = Cli {
4786 node: "nonexistent-peer".into(),
4787 token: None,
4788 command: Some(Commands::List { all: false }),
4789 path: None,
4790 };
4791 let result = execute(&cli).await;
4792 assert!(result.is_err());
4794 }
4795
4796 #[test]
4799 fn test_cli_parse_spawn_auto() {
4800 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4801 assert!(matches!(
4802 &cli.command,
4803 Some(Commands::Spawn { auto, .. }) if *auto
4804 ));
4805 }
4806
4807 #[test]
4808 fn test_cli_parse_spawn_auto_default() {
4809 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4810 assert!(matches!(
4811 &cli.command,
4812 Some(Commands::Spawn { auto, .. }) if !auto
4813 ));
4814 }
4815
4816 #[tokio::test]
4817 async fn test_select_best_node_coverage_stub() {
4818 let client = reqwest::Client::new();
4820 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4823 }
4824
4825 #[cfg(not(coverage))]
4826 #[tokio::test]
4827 async fn test_select_best_node_picks_least_loaded() {
4828 use axum::{Router, routing::get};
4829
4830 let app = Router::new().route(
4831 "/api/v1/peers",
4832 get(|| async {
4833 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4834 }),
4835 );
4836 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4837 let addr = listener.local_addr().unwrap();
4838 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4839 let base = format!("http://127.0.0.1:{}", addr.port());
4840
4841 let client = reqwest::Client::new();
4842 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4843 assert_eq!(name, "idle");
4847 assert_eq!(addr, "idle:7433");
4848 }
4849
4850 #[cfg(not(coverage))]
4851 #[tokio::test]
4852 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4853 use axum::{Router, routing::get};
4854
4855 let app = Router::new().route(
4856 "/api/v1/peers",
4857 get(|| async {
4858 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4859 }),
4860 );
4861 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4862 let addr = listener.local_addr().unwrap();
4863 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4864 let base = format!("http://127.0.0.1:{}", addr.port());
4865
4866 let client = reqwest::Client::new();
4867 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4868 assert_eq!(name, "my-mac");
4869 assert_eq!(addr, "localhost:7433");
4870 }
4871
4872 #[cfg(not(coverage))]
4873 #[tokio::test]
4874 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4875 use axum::{Router, routing::get};
4876
4877 let app = Router::new().route(
4878 "/api/v1/peers",
4879 get(|| async {
4880 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4881 }),
4882 );
4883 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4884 let addr = listener.local_addr().unwrap();
4885 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4886 let base = format!("http://127.0.0.1:{}", addr.port());
4887
4888 let client = reqwest::Client::new();
4889 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4890 assert_eq!(name, "solo");
4891 assert_eq!(addr, "localhost:7433");
4892 }
4893
4894 #[cfg(not(coverage))]
4895 #[tokio::test]
4896 async fn test_execute_spawn_auto_selects_node() {
4897 use axum::{
4898 Router,
4899 http::StatusCode,
4900 routing::{get, post},
4901 };
4902
4903 let create_json = test_create_response_json();
4904
4905 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4907 let addr = listener.local_addr().unwrap();
4908 let node = format!("127.0.0.1:{}", addr.port());
4909 let peer_addr = node.clone();
4910
4911 let app = Router::new()
4912 .route(
4913 "/api/v1/peers",
4914 get(move || {
4915 let peer_addr = peer_addr.clone();
4916 async move {
4917 format!(
4918 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4919 )
4920 }
4921 }),
4922 )
4923 .route(
4924 "/api/v1/sessions",
4925 post(move || async move {
4926 (StatusCode::CREATED, create_json.clone())
4927 }),
4928 );
4929
4930 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4931
4932 let cli = Cli {
4933 node,
4934 token: None,
4935 command: Some(Commands::Spawn {
4936 name: Some("test".into()),
4937 workdir: Some("/tmp/repo".into()),
4938 ink: None,
4939 description: None,
4940 detach: true,
4941 idle_threshold: None,
4942 auto: true,
4943 worktree: false,
4944 worktree_base: None,
4945 runtime: None,
4946 secret: vec![],
4947 command: vec!["echo".into(), "hello".into()],
4948 }),
4949 path: None,
4950 };
4951 let result = execute(&cli).await.unwrap();
4952 assert!(result.contains("Created session"));
4953 }
4954
4955 #[cfg(not(coverage))]
4956 #[tokio::test]
4957 async fn test_select_best_node_peer_no_session_count() {
4958 use axum::{Router, routing::get};
4959
4960 let app = Router::new().route(
4961 "/api/v1/peers",
4962 get(|| async {
4963 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4964 }),
4965 );
4966 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4967 let addr = listener.local_addr().unwrap();
4968 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4969 let base = format!("http://127.0.0.1:{}", addr.port());
4970
4971 let client = reqwest::Client::new();
4972 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4973 assert_eq!(name, "fresh");
4975 assert_eq!(addr, "fresh:7433");
4976 }
4977
4978 #[test]
4981 fn test_cli_parse_secret_set() {
4982 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4983 assert!(matches!(
4984 &cli.command,
4985 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4986 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4987 ));
4988 }
4989
4990 #[test]
4991 fn test_cli_parse_secret_list() {
4992 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4993 assert!(matches!(
4994 &cli.command,
4995 Some(Commands::Secret {
4996 action: SecretAction::List
4997 })
4998 ));
4999 }
5000
5001 #[test]
5002 fn test_cli_parse_secret_list_alias() {
5003 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
5004 assert!(matches!(
5005 &cli.command,
5006 Some(Commands::Secret {
5007 action: SecretAction::List
5008 })
5009 ));
5010 }
5011
5012 #[test]
5013 fn test_cli_parse_secret_delete() {
5014 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
5015 assert!(matches!(
5016 &cli.command,
5017 Some(Commands::Secret { action: SecretAction::Delete { name } })
5018 if name == "MY_TOKEN"
5019 ));
5020 }
5021
5022 #[test]
5023 fn test_cli_parse_secret_delete_alias() {
5024 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
5025 assert!(matches!(
5026 &cli.command,
5027 Some(Commands::Secret { action: SecretAction::Delete { name } })
5028 if name == "MY_TOKEN"
5029 ));
5030 }
5031
5032 #[test]
5033 fn test_cli_parse_secret_alias() {
5034 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
5035 assert!(matches!(
5036 &cli.command,
5037 Some(Commands::Secret {
5038 action: SecretAction::List
5039 })
5040 ));
5041 }
5042
5043 #[test]
5044 fn test_format_secrets_empty() {
5045 let secrets: Vec<serde_json::Value> = vec![];
5046 assert_eq!(format_secrets(&secrets), "No secrets configured.");
5047 }
5048
5049 #[test]
5050 fn test_format_secrets_with_entries() {
5051 let secrets = vec![
5052 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5053 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5054 ];
5055 let output = format_secrets(&secrets);
5056 assert!(output.contains("GITHUB_TOKEN"));
5057 assert!(output.contains("NPM_TOKEN"));
5058 assert!(output.contains("NAME"));
5059 assert!(output.contains("ENV"));
5060 assert!(output.contains("CREATED"));
5061 }
5062
5063 #[test]
5064 fn test_format_secrets_with_env() {
5065 let secrets = vec![
5066 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
5067 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
5068 ];
5069 let output = format_secrets(&secrets);
5070 assert!(output.contains("GH_WORK"));
5071 assert!(output.contains("GITHUB_TOKEN"));
5072 assert!(output.contains("NPM_TOKEN"));
5073 }
5074
5075 #[test]
5076 fn test_format_secrets_short_timestamp() {
5077 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
5078 let output = format_secrets(&secrets);
5079 assert!(output.contains("now"));
5080 }
5081
5082 #[test]
5083 fn test_format_schedules_short_last_run_at() {
5084 let schedules = vec![serde_json::json!({
5086 "name": "test",
5087 "cron": "* * * * *",
5088 "enabled": true,
5089 "last_run_at": "short",
5090 "target_node": null
5091 })];
5092 let output = format_schedules(&schedules);
5093 assert!(output.contains("short"));
5094 }
5095
5096 #[test]
5097 fn test_format_sessions_multibyte_command_truncation() {
5098 use chrono::Utc;
5099 use pulpo_common::session::SessionStatus;
5100 use uuid::Uuid;
5101
5102 let sessions = vec![Session {
5104 id: Uuid::nil(),
5105 name: "test".into(),
5106 workdir: "/tmp".into(),
5107 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
5108 description: None,
5109 status: SessionStatus::Active,
5110 exit_code: None,
5111 backend_session_id: None,
5112 output_snapshot: None,
5113 metadata: None,
5114 ink: None,
5115 intervention_code: None,
5116 intervention_reason: None,
5117 intervention_at: None,
5118 last_output_at: None,
5119 idle_since: None,
5120 idle_threshold_secs: None,
5121 worktree_path: None,
5122 worktree_branch: None,
5123 git_branch: None,
5124 git_commit: None,
5125 git_files_changed: None,
5126 git_insertions: None,
5127 git_deletions: None,
5128 git_ahead: None,
5129 runtime: Runtime::Tmux,
5130 created_at: Utc::now(),
5131 updated_at: Utc::now(),
5132 }];
5133 let output = format_sessions(&sessions);
5134 assert!(output.contains("..."));
5135 }
5136}