1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version = env!("PULPO_VERSION"),
15 args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18 #[arg(long, default_value = "localhost:7433")]
20 pub node: String,
21
22 #[arg(long)]
24 pub token: Option<String>,
25
26 #[command(subcommand)]
27 pub command: Option<Commands>,
28
29 #[arg(value_name = "PATH")]
31 pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37 #[command(visible_alias = "a")]
39 Attach {
40 name: String,
42 },
43
44 #[command(visible_alias = "i", visible_alias = "send")]
46 Input {
47 name: String,
49 text: Option<String>,
51 },
52
53 #[command(visible_alias = "s")]
55 Spawn {
56 name: Option<String>,
58
59 #[arg(long)]
61 workdir: Option<String>,
62
63 #[arg(long)]
65 ink: Option<String>,
66
67 #[arg(long)]
69 description: Option<String>,
70
71 #[arg(short, long)]
73 detach: bool,
74
75 #[arg(long)]
77 idle_threshold: Option<u32>,
78
79 #[arg(long)]
81 auto: bool,
82
83 #[arg(long)]
85 worktree: bool,
86
87 #[arg(long)]
89 runtime: Option<String>,
90
91 #[arg(long)]
93 secret: Vec<String>,
94
95 #[arg(last = true)]
97 command: Vec<String>,
98 },
99
100 #[command(visible_alias = "ls")]
102 List {
103 #[arg(short, long)]
105 all: bool,
106 },
107
108 #[command(visible_alias = "l")]
110 Logs {
111 name: String,
113
114 #[arg(long, default_value = "100")]
116 lines: usize,
117
118 #[arg(short, long)]
120 follow: bool,
121 },
122
123 #[command(visible_alias = "k")]
125 Kill {
126 #[arg(required = true)]
128 names: Vec<String>,
129 },
130
131 #[command(visible_alias = "rm")]
133 Delete {
134 #[arg(required = true)]
136 names: Vec<String>,
137 },
138
139 Cleanup,
141
142 #[command(visible_alias = "r")]
144 Resume {
145 name: String,
147 },
148
149 #[command(visible_alias = "n")]
151 Nodes,
152
153 #[command(visible_alias = "iv")]
155 Interventions {
156 name: String,
158 },
159
160 Ui,
162
163 #[command(visible_alias = "sched")]
165 Schedule {
166 #[command(subcommand)]
167 action: ScheduleAction,
168 },
169
170 #[command(visible_alias = "sec")]
172 Secret {
173 #[command(subcommand)]
174 action: SecretAction,
175 },
176}
177
178#[derive(Subcommand, Debug)]
179pub enum SecretAction {
180 Set {
182 name: String,
184 value: String,
186 #[arg(long)]
188 env: Option<String>,
189 },
190 #[command(visible_alias = "ls")]
192 List,
193 #[command(visible_alias = "rm")]
195 Delete {
196 name: String,
198 },
199}
200
201#[derive(Subcommand, Debug)]
202pub enum ScheduleAction {
203 #[command(alias = "install")]
205 Add {
206 name: String,
208 cron: String,
210 #[arg(long)]
212 workdir: Option<String>,
213 #[arg(long)]
215 node: Option<String>,
216 #[arg(long)]
218 ink: Option<String>,
219 #[arg(long)]
221 description: Option<String>,
222 #[arg(last = true)]
224 command: Vec<String>,
225 },
226 #[command(alias = "ls")]
228 List,
229 #[command(alias = "rm")]
231 Remove {
232 name: String,
234 },
235 Pause {
237 name: String,
239 },
240 Resume {
242 name: String,
244 },
245}
246
247const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
249
250fn resolve_path(path: &str) -> String {
252 let p = std::path::Path::new(path);
253 if p.is_absolute() {
254 path.to_owned()
255 } else {
256 std::env::current_dir().map_or_else(
257 |_| path.to_owned(),
258 |cwd| cwd.join(p).to_string_lossy().into_owned(),
259 )
260 }
261}
262
263fn derive_session_name(path: &str) -> String {
265 let basename = std::path::Path::new(path)
266 .file_name()
267 .and_then(|n| n.to_str())
268 .unwrap_or("session");
269 let kebab: String = basename
271 .chars()
272 .map(|c| {
273 if c.is_ascii_alphanumeric() {
274 c.to_ascii_lowercase()
275 } else {
276 '-'
277 }
278 })
279 .collect();
280 let mut result = String::new();
282 for c in kebab.chars() {
283 if c == '-' && result.ends_with('-') {
284 continue;
285 }
286 result.push(c);
287 }
288 let result = result.trim_matches('-').to_owned();
289 if result.is_empty() {
290 "session".to_owned()
291 } else {
292 result
293 }
294}
295
296async fn deduplicate_session_name(
298 client: &reqwest::Client,
299 base: &str,
300 name: &str,
301 token: Option<&str>,
302) -> String {
303 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
305 .send()
306 .await;
307 match resp {
308 Ok(r) if r.status().is_success() => {
309 for i in 2..=99 {
311 let candidate = format!("{name}-{i}");
312 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
313 .send()
314 .await;
315 match resp {
316 Ok(r) if r.status().is_success() => {}
317 _ => return candidate,
318 }
319 }
320 format!("{name}-100")
321 }
322 _ => name.to_owned(),
323 }
324}
325
326pub fn base_url(node: &str) -> String {
328 format!("http://{node}")
329}
330
331#[derive(serde::Deserialize)]
333struct OutputResponse {
334 output: String,
335}
336
337const fn session_runtime(session: &Session) -> &'static str {
339 match session.runtime {
340 Runtime::Tmux => "tmux",
341 Runtime::Docker => "docker",
342 }
343}
344
345fn format_sessions(sessions: &[Session]) -> String {
346 if sessions.is_empty() {
347 return "No sessions.".into();
348 }
349 let mut lines = vec![format!(
350 "{:<10} {:<24} {:<12} {:<8} {}",
351 "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
352 )];
353 for s in sessions {
354 let cmd_display = if s.command.len() > 50 {
355 let truncated: String = s.command.chars().take(47).collect();
356 format!("{truncated}...")
357 } else {
358 s.command.clone()
359 };
360 let short_id = &s.id.to_string()[..8];
361 let has_pr = s
362 .metadata
363 .as_ref()
364 .is_some_and(|m| m.contains_key("pr_url"));
365 let name_display = match (s.worktree_path.is_some(), has_pr) {
366 (true, true) => format!("{} [wt] [PR]", s.name),
367 (true, false) => format!("{} [wt]", s.name),
368 (false, true) => format!("{} [PR]", s.name),
369 (false, false) => s.name.clone(),
370 };
371 lines.push(format!(
372 "{:<10} {:<24} {:<12} {:<8} {}",
373 short_id,
374 name_display,
375 s.status,
376 session_runtime(s),
377 cmd_display
378 ));
379 }
380 lines.join("\n")
381}
382
383fn format_nodes(resp: &PeersResponse) -> String {
385 let mut lines = vec![format!(
386 "{:<20} {:<25} {:<10} {}",
387 "NAME", "ADDRESS", "STATUS", "SESSIONS"
388 )];
389 lines.push(format!(
390 "{:<20} {:<25} {:<10} {}",
391 resp.local.name, "(local)", "online", "-"
392 ));
393 for p in &resp.peers {
394 let sessions = p
395 .session_count
396 .map_or_else(|| "-".into(), |c| c.to_string());
397 lines.push(format!(
398 "{:<20} {:<25} {:<10} {}",
399 p.name, p.address, p.status, sessions
400 ));
401 }
402 lines.join("\n")
403}
404
405fn format_interventions(events: &[InterventionEventResponse]) -> String {
407 if events.is_empty() {
408 return "No intervention events.".into();
409 }
410 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
411 for e in events {
412 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
413 }
414 lines.join("\n")
415}
416
417#[cfg_attr(coverage, allow(dead_code))]
419fn build_open_command(url: &str) -> std::process::Command {
420 #[cfg(target_os = "macos")]
421 {
422 let mut cmd = std::process::Command::new("open");
423 cmd.arg(url);
424 cmd
425 }
426 #[cfg(target_os = "linux")]
427 {
428 let mut cmd = std::process::Command::new("xdg-open");
429 cmd.arg(url);
430 cmd
431 }
432 #[cfg(target_os = "windows")]
433 {
434 let mut cmd = std::process::Command::new("cmd");
435 cmd.args(["/C", "start", url]);
436 cmd
437 }
438 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
439 {
440 let mut cmd = std::process::Command::new("xdg-open");
442 cmd.arg(url);
443 cmd
444 }
445}
446
447#[cfg(not(coverage))]
449fn open_browser(url: &str) -> Result<()> {
450 build_open_command(url).status()?;
451 Ok(())
452}
453
454#[cfg(coverage)]
456fn open_browser(_url: &str) -> Result<()> {
457 Ok(())
458}
459
460#[cfg_attr(coverage, allow(dead_code))]
463fn build_attach_command(backend_session_id: &str) -> std::process::Command {
464 if let Some(container) = backend_session_id.strip_prefix("docker:") {
466 let mut cmd = std::process::Command::new("docker");
467 cmd.args(["exec", "-it", container, "/bin/sh"]);
468 return cmd;
469 }
470 #[cfg(not(target_os = "windows"))]
472 {
473 let mut cmd = std::process::Command::new("tmux");
474 cmd.args(["attach-session", "-t", backend_session_id]);
475 cmd
476 }
477 #[cfg(target_os = "windows")]
478 {
479 let mut cmd = std::process::Command::new("cmd");
481 cmd.args([
482 "/C",
483 "echo",
484 "Attach not available on Windows. Use the web UI or --runtime docker.",
485 ]);
486 cmd
487 }
488}
489
490#[cfg(not(any(test, coverage, target_os = "windows")))]
492fn attach_session(backend_session_id: &str) -> Result<()> {
493 let status = build_attach_command(backend_session_id).status()?;
494 if !status.success() {
495 anyhow::bail!("attach failed with {status}");
496 }
497 Ok(())
498}
499
500#[cfg(all(target_os = "windows", not(test), not(coverage)))]
502fn attach_session(_backend_session_id: &str) -> Result<()> {
503 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
504 Ok(())
505}
506
507#[cfg(any(test, coverage))]
509#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
510fn attach_session(_backend_session_id: &str) -> Result<()> {
511 Ok(())
512}
513
514fn api_error(text: &str) -> anyhow::Error {
516 serde_json::from_str::<serde_json::Value>(text)
517 .ok()
518 .and_then(|v| v["error"].as_str().map(String::from))
519 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
520}
521
522async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
524 if resp.status().is_success() {
525 Ok(resp.text().await?)
526 } else {
527 let text = resp.text().await?;
528 Err(api_error(&text))
529 }
530}
531
532fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
534 if err.is_connect() {
535 anyhow::anyhow!(
536 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
537 )
538 } else {
539 anyhow::anyhow!("Network error connecting to {node}: {err}")
540 }
541}
542
543fn is_localhost(node: &str) -> bool {
545 let host = node.split(':').next().unwrap_or(node);
546 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
547}
548
549async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
551 let resp = client
552 .get(format!("{base}/api/v1/auth/token"))
553 .send()
554 .await
555 .ok()?;
556 let body: AuthTokenResponse = resp.json().await.ok()?;
557 if body.token.is_empty() {
558 None
559 } else {
560 Some(body.token)
561 }
562}
563
564async fn resolve_token(
566 client: &reqwest::Client,
567 base: &str,
568 node: &str,
569 explicit: Option<&str>,
570) -> Option<String> {
571 if let Some(t) = explicit {
572 return Some(t.to_owned());
573 }
574 if is_localhost(node) {
575 return discover_token(client, base).await;
576 }
577 None
578}
579
580fn node_needs_resolution(node: &str) -> bool {
582 !node.contains(':')
583}
584
585#[cfg(not(coverage))]
592async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
593 if !node_needs_resolution(node) {
595 return (node.to_owned(), None);
596 }
597
598 let local_base = "http://localhost:7433";
600 let mut resolved_address: Option<String> = None;
601
602 if let Ok(resp) = client
603 .get(format!("{local_base}/api/v1/peers"))
604 .send()
605 .await
606 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
607 {
608 for peer in &peers_resp.peers {
609 if peer.name == node {
610 resolved_address = Some(peer.address.clone());
611 break;
612 }
613 }
614 }
615
616 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
617
618 let peer_token = if let Ok(resp) = client
620 .get(format!("{local_base}/api/v1/config"))
621 .send()
622 .await
623 && let Ok(config) = resp.json::<ConfigResponse>().await
624 && let Some(entry) = config.peers.get(node)
625 {
626 entry.token().map(String::from)
627 } else {
628 None
629 };
630
631 (address, peer_token)
632}
633
634#[cfg(coverage)]
636async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
637 if node_needs_resolution(node) {
638 (format!("{node}:7433"), None)
639 } else {
640 (node.to_owned(), None)
641 }
642}
643
644fn authed_get(
646 client: &reqwest::Client,
647 url: String,
648 token: Option<&str>,
649) -> reqwest::RequestBuilder {
650 let req = client.get(url);
651 if let Some(t) = token {
652 req.bearer_auth(t)
653 } else {
654 req
655 }
656}
657
658fn authed_post(
660 client: &reqwest::Client,
661 url: String,
662 token: Option<&str>,
663) -> reqwest::RequestBuilder {
664 let req = client.post(url);
665 if let Some(t) = token {
666 req.bearer_auth(t)
667 } else {
668 req
669 }
670}
671
672fn authed_delete(
674 client: &reqwest::Client,
675 url: String,
676 token: Option<&str>,
677) -> reqwest::RequestBuilder {
678 let req = client.delete(url);
679 if let Some(t) = token {
680 req.bearer_auth(t)
681 } else {
682 req
683 }
684}
685
686#[cfg(not(coverage))]
688fn authed_put(
689 client: &reqwest::Client,
690 url: String,
691 token: Option<&str>,
692) -> reqwest::RequestBuilder {
693 let req = client.put(url);
694 if let Some(t) = token {
695 req.bearer_auth(t)
696 } else {
697 req
698 }
699}
700
701async fn fetch_output(
703 client: &reqwest::Client,
704 base: &str,
705 name: &str,
706 lines: usize,
707 token: Option<&str>,
708) -> Result<String> {
709 let resp = authed_get(
710 client,
711 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
712 token,
713 )
714 .send()
715 .await?;
716 let text = ok_or_api_error(resp).await?;
717 let output: OutputResponse = serde_json::from_str(&text)?;
718 Ok(output.output)
719}
720
721async fn fetch_session_status(
723 client: &reqwest::Client,
724 base: &str,
725 name: &str,
726 token: Option<&str>,
727) -> Result<String> {
728 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
729 .send()
730 .await?;
731 let text = ok_or_api_error(resp).await?;
732 let session: Session = serde_json::from_str(&text)?;
733 Ok(session.status.to_string())
734}
735
736async fn check_session_alive(
740 client: &reqwest::Client,
741 base: &str,
742 session_id: &str,
743 token: Option<&str>,
744) -> Result<()> {
745 for _ in 0..3 {
747 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
748 let resp = authed_get(
750 client,
751 format!("{base}/api/v1/sessions/{session_id}"),
752 token,
753 )
754 .send()
755 .await;
756 if let Ok(resp) = resp
757 && let Ok(text) = ok_or_api_error(resp).await
758 && let Ok(session) = serde_json::from_str::<Session>(&text)
759 {
760 match session.status.to_string().as_str() {
761 "creating" => continue,
762 "lost" | "killed" => {
763 anyhow::bail!(
764 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
765 session.name,
766 session.name
767 );
768 }
769 _ => return Ok(()),
770 }
771 }
772 break;
774 }
775 Ok(())
776}
777
778fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
785 if prev.is_empty() {
786 return new;
787 }
788
789 let prev_lines: Vec<&str> = prev.lines().collect();
790 let new_lines: Vec<&str> = new.lines().collect();
791
792 if new_lines.is_empty() {
793 return "";
794 }
795
796 let last_prev = prev_lines[prev_lines.len() - 1];
798
799 for i in (0..new_lines.len()).rev() {
801 if new_lines[i] == last_prev {
802 let overlap_len = prev_lines.len().min(i + 1);
804 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
805 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
806 if prev_tail == new_overlap {
807 if i + 1 < new_lines.len() {
808 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
810 return new.get(consumed.min(new.len())..).unwrap_or("");
811 }
812 return "";
813 }
814 }
815 }
816
817 new
819}
820
821async fn follow_logs(
823 client: &reqwest::Client,
824 base: &str,
825 name: &str,
826 lines: usize,
827 token: Option<&str>,
828 writer: &mut (dyn std::io::Write + Send),
829) -> Result<()> {
830 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
831 write!(writer, "{prev_output}")?;
832
833 let mut unchanged_ticks: u32 = 0;
834
835 loop {
836 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
837
838 let new_output = fetch_output(client, base, name, lines, token).await?;
840
841 let diff = diff_output(&prev_output, &new_output);
842 if diff.is_empty() {
843 unchanged_ticks += 1;
844 } else {
845 write!(writer, "{diff}")?;
846 unchanged_ticks = 0;
847 }
848
849 if new_output.contains(AGENT_EXIT_MARKER) {
851 break;
852 }
853
854 prev_output = new_output;
855
856 if unchanged_ticks >= 3 {
858 let status = fetch_session_status(client, base, name, token).await?;
859 let is_terminal = status == "ready" || status == "killed" || status == "lost";
860 if is_terminal {
861 break;
862 }
863 }
864 }
865 Ok(())
866}
867
868#[cfg(not(coverage))]
872async fn execute_schedule(
873 client: &reqwest::Client,
874 action: &ScheduleAction,
875 base: &str,
876 token: Option<&str>,
877) -> Result<String> {
878 match action {
879 ScheduleAction::Add {
880 name,
881 cron,
882 workdir,
883 node,
884 ink,
885 description,
886 command,
887 } => {
888 let cmd = if command.is_empty() {
889 None
890 } else {
891 Some(command.join(" "))
892 };
893 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
894 std::env::current_dir()
895 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
896 });
897 let mut body = serde_json::json!({
898 "name": name,
899 "cron": cron,
900 "workdir": resolved_workdir,
901 });
902 if let Some(c) = &cmd {
903 body["command"] = serde_json::json!(c);
904 }
905 if let Some(n) = node {
906 body["target_node"] = serde_json::json!(n);
907 }
908 if let Some(i) = ink {
909 body["ink"] = serde_json::json!(i);
910 }
911 if let Some(d) = description {
912 body["description"] = serde_json::json!(d);
913 }
914 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
915 .json(&body)
916 .send()
917 .await?;
918 ok_or_api_error(resp).await?;
919 Ok(format!("Created schedule \"{name}\""))
920 }
921 ScheduleAction::List => {
922 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
923 .send()
924 .await?;
925 let text = ok_or_api_error(resp).await?;
926 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
927 Ok(format_schedules(&schedules))
928 }
929 ScheduleAction::Remove { name } => {
930 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
931 .send()
932 .await?;
933 ok_or_api_error(resp).await?;
934 Ok(format!("Removed schedule \"{name}\""))
935 }
936 ScheduleAction::Pause { name } => {
937 let body = serde_json::json!({ "enabled": false });
938 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
939 .json(&body)
940 .send()
941 .await?;
942 ok_or_api_error(resp).await?;
943 Ok(format!("Paused schedule \"{name}\""))
944 }
945 ScheduleAction::Resume { name } => {
946 let body = serde_json::json!({ "enabled": true });
947 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
948 .json(&body)
949 .send()
950 .await?;
951 ok_or_api_error(resp).await?;
952 Ok(format!("Resumed schedule \"{name}\""))
953 }
954 }
955}
956
957#[cfg(coverage)]
959#[allow(clippy::unnecessary_wraps)]
960async fn execute_schedule(
961 _client: &reqwest::Client,
962 _action: &ScheduleAction,
963 _base: &str,
964 _token: Option<&str>,
965) -> Result<String> {
966 Ok(String::new())
967}
968
969#[cfg_attr(coverage, allow(dead_code))]
973fn format_secrets(secrets: &[serde_json::Value]) -> String {
974 if secrets.is_empty() {
975 return "No secrets configured.".into();
976 }
977 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
978 for s in secrets {
979 let name = s["name"].as_str().unwrap_or("?");
980 let env_display = s["env"]
981 .as_str()
982 .map_or_else(|| name.to_owned(), String::from);
983 let created = s["created_at"]
984 .as_str()
985 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
986 lines.push(format!("{name:<24} {env_display:<24} {created}"));
987 }
988 lines.join("\n")
989}
990
991#[cfg(not(coverage))]
993async fn execute_secret(
994 client: &reqwest::Client,
995 action: &SecretAction,
996 base: &str,
997 token: Option<&str>,
998) -> Result<String> {
999 match action {
1000 SecretAction::Set { name, value, env } => {
1001 let mut body = serde_json::json!({ "value": value });
1002 if let Some(e) = env {
1003 body["env"] = serde_json::json!(e);
1004 }
1005 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1006 .json(&body)
1007 .send()
1008 .await?;
1009 ok_or_api_error(resp).await?;
1010 Ok(format!("Secret \"{name}\" set."))
1011 }
1012 SecretAction::List => {
1013 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1014 .send()
1015 .await?;
1016 let text = ok_or_api_error(resp).await?;
1017 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1018 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1019 Ok(format_secrets(secrets))
1020 }
1021 SecretAction::Delete { name } => {
1022 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1023 .send()
1024 .await?;
1025 ok_or_api_error(resp).await?;
1026 Ok(format!("Secret \"{name}\" deleted."))
1027 }
1028 }
1029}
1030
1031#[cfg(coverage)]
1033#[allow(clippy::unnecessary_wraps)]
1034async fn execute_secret(
1035 _client: &reqwest::Client,
1036 _action: &SecretAction,
1037 _base: &str,
1038 _token: Option<&str>,
1039) -> Result<String> {
1040 Ok(String::new())
1041}
1042
1043#[cfg_attr(coverage, allow(dead_code))]
1045fn format_schedules(schedules: &[serde_json::Value]) -> String {
1046 if schedules.is_empty() {
1047 return "No schedules.".into();
1048 }
1049 let mut lines = vec![format!(
1050 "{:<20} {:<18} {:<8} {:<12} {}",
1051 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1052 )];
1053 for s in schedules {
1054 let name = s["name"].as_str().unwrap_or("?");
1055 let cron = s["cron"].as_str().unwrap_or("?");
1056 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1057 "yes"
1058 } else {
1059 "no"
1060 };
1061 let last_run = s["last_run_at"]
1062 .as_str()
1063 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1064 let node = s["target_node"].as_str().unwrap_or("local");
1065 lines.push(format!(
1066 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1067 ));
1068 }
1069 lines.join("\n")
1070}
1071
1072#[cfg(not(coverage))]
1076async fn select_best_node(
1077 client: &reqwest::Client,
1078 base: &str,
1079 token: Option<&str>,
1080) -> Result<(String, String)> {
1081 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1082 .send()
1083 .await?;
1084 let text = ok_or_api_error(resp).await?;
1085 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1086
1087 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1091 if peer.status != pulpo_common::peer::PeerStatus::Online {
1092 continue;
1093 }
1094 let sessions = peer.session_count.unwrap_or(0);
1095 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1096 #[allow(clippy::cast_precision_loss)]
1098 let score = sessions as f64 - (mem as f64 / 1024.0);
1099 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1100 best = Some((peer.address.clone(), peer.name.clone(), score));
1101 }
1102 }
1103
1104 match best {
1106 Some((addr, name, _)) => Ok((addr, name)),
1107 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1108 }
1109}
1110
1111#[cfg(coverage)]
1113#[allow(clippy::unnecessary_wraps)]
1114async fn select_best_node(
1115 _client: &reqwest::Client,
1116 _base: &str,
1117 _token: Option<&str>,
1118) -> Result<(String, String)> {
1119 Ok(("localhost:7433".into(), "local".into()))
1120}
1121
1122#[allow(clippy::too_many_lines)]
1124pub async fn execute(cli: &Cli) -> Result<String> {
1125 let client = reqwest::Client::new();
1126 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1127 let url = base_url(&resolved_node);
1128 let node = &resolved_node;
1129 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1130 .await
1131 .or(peer_token);
1132
1133 if cli.command.is_none() && cli.path.is_none() {
1135 use clap::CommandFactory;
1137 let mut cmd = Cli::command();
1138 cmd.print_help()?;
1139 println!();
1140 return Ok(String::new());
1141 }
1142 if cli.command.is_none() {
1143 let path = cli.path.as_deref().unwrap_or(".");
1144 let resolved_workdir = resolve_path(path);
1145 let base_name = derive_session_name(&resolved_workdir);
1146 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1147 let body = serde_json::json!({
1148 "name": name,
1149 "workdir": resolved_workdir,
1150 });
1151 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1152 .json(&body)
1153 .send()
1154 .await
1155 .map_err(|e| friendly_error(&e, node))?;
1156 let text = ok_or_api_error(resp).await?;
1157 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1158 let msg = format!(
1159 "Created session \"{}\" ({})",
1160 resp.session.name, resp.session.id
1161 );
1162 let backend_id = resp
1163 .session
1164 .backend_session_id
1165 .as_deref()
1166 .unwrap_or(&resp.session.name);
1167 eprintln!("{msg}");
1168 attach_session(backend_id)?;
1171 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1172 }
1173
1174 match cli.command.as_ref().unwrap() {
1175 Commands::Attach { name } => {
1176 let resp = authed_get(
1178 &client,
1179 format!("{url}/api/v1/sessions/{name}"),
1180 token.as_deref(),
1181 )
1182 .send()
1183 .await
1184 .map_err(|e| friendly_error(&e, node))?;
1185 let text = ok_or_api_error(resp).await?;
1186 let session: Session = serde_json::from_str(&text)?;
1187 match session.status.to_string().as_str() {
1188 "lost" => {
1189 anyhow::bail!(
1190 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1191 );
1192 }
1193 "killed" => {
1194 anyhow::bail!(
1195 "Session \"{name}\" is {} — cannot attach to a killed session.",
1196 session.status
1197 );
1198 }
1199 _ => {}
1200 }
1201 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1202 attach_session(&backend_id)?;
1203 Ok(format!("Detached from session {name}."))
1204 }
1205 Commands::Input { name, text } => {
1206 let input_text = text.as_deref().unwrap_or("\n");
1207 let body = serde_json::json!({ "text": input_text });
1208 let resp = authed_post(
1209 &client,
1210 format!("{url}/api/v1/sessions/{name}/input"),
1211 token.as_deref(),
1212 )
1213 .json(&body)
1214 .send()
1215 .await
1216 .map_err(|e| friendly_error(&e, node))?;
1217 ok_or_api_error(resp).await?;
1218 Ok(format!("Sent input to session {name}."))
1219 }
1220 Commands::List { all } => {
1221 let list_url = if *all {
1222 format!("{url}/api/v1/sessions")
1223 } else {
1224 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1225 };
1226 let resp = authed_get(&client, list_url, token.as_deref())
1227 .send()
1228 .await
1229 .map_err(|e| friendly_error(&e, node))?;
1230 let text = ok_or_api_error(resp).await?;
1231 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1232 Ok(format_sessions(&sessions))
1233 }
1234 Commands::Nodes => {
1235 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1236 .send()
1237 .await
1238 .map_err(|e| friendly_error(&e, node))?;
1239 let text = ok_or_api_error(resp).await?;
1240 let resp: PeersResponse = serde_json::from_str(&text)?;
1241 Ok(format_nodes(&resp))
1242 }
1243 Commands::Spawn {
1244 workdir,
1245 name,
1246 ink,
1247 description,
1248 detach,
1249 idle_threshold,
1250 auto,
1251 worktree,
1252 runtime,
1253 secret,
1254 command,
1255 } => {
1256 let cmd = if command.is_empty() {
1257 None
1258 } else {
1259 Some(command.join(" "))
1260 };
1261 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1263 std::env::current_dir()
1264 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1265 });
1266 let resolved_name = if let Some(n) = name {
1268 n.clone()
1269 } else {
1270 let base_name = derive_session_name(&resolved_workdir);
1271 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1272 };
1273 let mut body = serde_json::json!({
1274 "name": resolved_name,
1275 "workdir": resolved_workdir,
1276 });
1277 if let Some(c) = &cmd {
1278 body["command"] = serde_json::json!(c);
1279 }
1280 if let Some(i) = ink {
1281 body["ink"] = serde_json::json!(i);
1282 }
1283 if let Some(d) = description {
1284 body["description"] = serde_json::json!(d);
1285 }
1286 if let Some(t) = idle_threshold {
1287 body["idle_threshold_secs"] = serde_json::json!(t);
1288 }
1289 if *worktree {
1290 body["worktree"] = serde_json::json!(true);
1291 eprintln!(
1292 "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1293 );
1294 }
1295 if let Some(rt) = runtime {
1296 body["runtime"] = serde_json::json!(rt);
1297 }
1298 if !secret.is_empty() {
1299 body["secrets"] = serde_json::json!(secret);
1300 }
1301 let spawn_url = if *auto {
1302 let (auto_addr, auto_name) =
1303 select_best_node(&client, &url, token.as_deref()).await?;
1304 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1305 base_url(&auto_addr)
1306 } else {
1307 url.clone()
1308 };
1309 let resp = authed_post(
1310 &client,
1311 format!("{spawn_url}/api/v1/sessions"),
1312 token.as_deref(),
1313 )
1314 .json(&body)
1315 .send()
1316 .await
1317 .map_err(|e| friendly_error(&e, node))?;
1318 let text = ok_or_api_error(resp).await?;
1319 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1320 let msg = format!(
1321 "Created session \"{}\" ({})",
1322 resp.session.name, resp.session.id
1323 );
1324 if !detach {
1325 let backend_id = resp
1326 .session
1327 .backend_session_id
1328 .as_deref()
1329 .unwrap_or(&resp.session.name);
1330 eprintln!("{msg}");
1331 if cmd.is_some() {
1334 let sid = resp.session.id.to_string();
1335 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1336 }
1337 attach_session(backend_id)?;
1338 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1339 }
1340 Ok(msg)
1341 }
1342 Commands::Kill { names } => {
1343 let mut results = Vec::new();
1344 for name in names {
1345 let resp = authed_post(
1346 &client,
1347 format!("{url}/api/v1/sessions/{name}/kill"),
1348 token.as_deref(),
1349 )
1350 .send()
1351 .await
1352 .map_err(|e| friendly_error(&e, node))?;
1353 match ok_or_api_error(resp).await {
1354 Ok(_) => results.push(format!("Session {name} killed.")),
1355 Err(e) => results.push(format!("Error killing {name}: {e}")),
1356 }
1357 }
1358 Ok(results.join("\n"))
1359 }
1360 Commands::Delete { names } => {
1361 let mut results = Vec::new();
1362 for name in names {
1363 let resp = authed_delete(
1364 &client,
1365 format!("{url}/api/v1/sessions/{name}"),
1366 token.as_deref(),
1367 )
1368 .send()
1369 .await
1370 .map_err(|e| friendly_error(&e, node))?;
1371 match ok_or_api_error(resp).await {
1372 Ok(_) => results.push(format!("Session {name} deleted.")),
1373 Err(e) => results.push(format!("Error deleting {name}: {e}")),
1374 }
1375 }
1376 Ok(results.join("\n"))
1377 }
1378 Commands::Cleanup => {
1379 let resp = authed_post(
1380 &client,
1381 format!("{url}/api/v1/sessions/cleanup"),
1382 token.as_deref(),
1383 )
1384 .send()
1385 .await
1386 .map_err(|e| friendly_error(&e, node))?;
1387 let text = ok_or_api_error(resp).await?;
1388 let result: serde_json::Value = serde_json::from_str(&text)?;
1389 let count = result["deleted"].as_u64().unwrap_or(0);
1390 if count == 0 {
1391 Ok("No killed or lost sessions to clean up.".into())
1392 } else {
1393 Ok(format!("Cleaned up {count} session(s)."))
1394 }
1395 }
1396 Commands::Logs {
1397 name,
1398 lines,
1399 follow,
1400 } => {
1401 if *follow {
1402 let mut stdout = std::io::stdout();
1403 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1404 .await
1405 .map_err(|e| {
1406 match e.downcast::<reqwest::Error>() {
1408 Ok(re) => friendly_error(&re, node),
1409 Err(other) => other,
1410 }
1411 })?;
1412 Ok(String::new())
1413 } else {
1414 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1415 .await
1416 .map_err(|e| match e.downcast::<reqwest::Error>() {
1417 Ok(re) => friendly_error(&re, node),
1418 Err(other) => other,
1419 })?;
1420 Ok(output)
1421 }
1422 }
1423 Commands::Interventions { name } => {
1424 let resp = authed_get(
1425 &client,
1426 format!("{url}/api/v1/sessions/{name}/interventions"),
1427 token.as_deref(),
1428 )
1429 .send()
1430 .await
1431 .map_err(|e| friendly_error(&e, node))?;
1432 let text = ok_or_api_error(resp).await?;
1433 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1434 Ok(format_interventions(&events))
1435 }
1436 Commands::Ui => {
1437 let dashboard = base_url(node);
1438 open_browser(&dashboard)?;
1439 Ok(format!("Opening {dashboard}"))
1440 }
1441 Commands::Resume { name } => {
1442 let resp = authed_post(
1443 &client,
1444 format!("{url}/api/v1/sessions/{name}/resume"),
1445 token.as_deref(),
1446 )
1447 .send()
1448 .await
1449 .map_err(|e| friendly_error(&e, node))?;
1450 let text = ok_or_api_error(resp).await?;
1451 let session: Session = serde_json::from_str(&text)?;
1452 let backend_id = session
1453 .backend_session_id
1454 .as_deref()
1455 .unwrap_or(&session.name);
1456 eprintln!("Resumed session \"{}\"", session.name);
1457 let sid = session.id.to_string();
1458 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1459 attach_session(backend_id)?;
1460 Ok(format!("Detached from session \"{}\".", session.name))
1461 }
1462 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1463 .await
1464 .map_err(|e| match e.downcast::<reqwest::Error>() {
1465 Ok(re) => friendly_error(&re, node),
1466 Err(other) => other,
1467 }),
1468 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1469 .await
1470 .map_err(|e| match e.downcast::<reqwest::Error>() {
1471 Ok(re) => friendly_error(&re, node),
1472 Err(other) => other,
1473 }),
1474 }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479 use super::*;
1480
1481 #[test]
1482 fn test_base_url() {
1483 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1484 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1485 }
1486
1487 #[test]
1488 fn test_cli_parse_list() {
1489 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1490 assert_eq!(cli.node, "localhost:7433");
1491 assert!(matches!(cli.command, Some(Commands::List { .. })));
1492 }
1493
1494 #[test]
1495 fn test_cli_parse_nodes() {
1496 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1497 assert!(matches!(cli.command, Some(Commands::Nodes)));
1498 }
1499
1500 #[test]
1501 fn test_cli_parse_ui() {
1502 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1503 assert!(matches!(cli.command, Some(Commands::Ui)));
1504 }
1505
1506 #[test]
1507 fn test_cli_parse_ui_custom_node() {
1508 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1509 assert_eq!(cli.node, "mac-mini:7433");
1511 assert_eq!(cli.path.as_deref(), Some("ui"));
1512 }
1513
1514 #[test]
1515 fn test_build_open_command() {
1516 let cmd = build_open_command("http://localhost:7433");
1517 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1518 assert_eq!(args, vec!["http://localhost:7433"]);
1519 #[cfg(target_os = "macos")]
1520 assert_eq!(cmd.get_program(), "open");
1521 #[cfg(target_os = "linux")]
1522 assert_eq!(cmd.get_program(), "xdg-open");
1523 }
1524
1525 #[test]
1526 fn test_cli_parse_spawn() {
1527 let cli = Cli::try_parse_from([
1528 "pulpo",
1529 "spawn",
1530 "my-task",
1531 "--workdir",
1532 "/tmp/repo",
1533 "--",
1534 "claude",
1535 "-p",
1536 "Fix the bug",
1537 ])
1538 .unwrap();
1539 assert!(matches!(
1540 &cli.command,
1541 Some(Commands::Spawn { name, workdir, command, .. })
1542 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1543 && command == &["claude", "-p", "Fix the bug"]
1544 ));
1545 }
1546
1547 #[test]
1548 fn test_cli_parse_spawn_with_ink() {
1549 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1550 assert!(matches!(
1551 &cli.command,
1552 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1553 ));
1554 }
1555
1556 #[test]
1557 fn test_cli_parse_spawn_with_description() {
1558 let cli =
1559 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1560 .unwrap();
1561 assert!(matches!(
1562 &cli.command,
1563 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1564 ));
1565 }
1566
1567 #[test]
1568 fn test_cli_parse_spawn_name_positional() {
1569 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1570 assert!(matches!(
1571 &cli.command,
1572 Some(Commands::Spawn { name, command, .. })
1573 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1574 ));
1575 }
1576
1577 #[test]
1578 fn test_cli_parse_spawn_no_command() {
1579 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1580 assert!(matches!(
1581 &cli.command,
1582 Some(Commands::Spawn { command, .. }) if command.is_empty()
1583 ));
1584 }
1585
1586 #[test]
1587 fn test_cli_parse_spawn_idle_threshold() {
1588 let cli =
1589 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1590 assert!(matches!(
1591 &cli.command,
1592 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1593 ));
1594 }
1595
1596 #[test]
1597 fn test_cli_parse_spawn_idle_threshold_60() {
1598 let cli =
1599 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1600 assert!(matches!(
1601 &cli.command,
1602 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1603 ));
1604 }
1605
1606 #[test]
1607 fn test_cli_parse_spawn_secrets() {
1608 let cli = Cli::try_parse_from([
1609 "pulpo",
1610 "spawn",
1611 "my-task",
1612 "--secret",
1613 "GITHUB_TOKEN",
1614 "--secret",
1615 "NPM_TOKEN",
1616 ])
1617 .unwrap();
1618 assert!(matches!(
1619 &cli.command,
1620 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1621 ));
1622 }
1623
1624 #[test]
1625 fn test_cli_parse_spawn_no_secrets() {
1626 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1627 assert!(matches!(
1628 &cli.command,
1629 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1630 ));
1631 }
1632
1633 #[test]
1634 fn test_cli_parse_secret_set_with_env() {
1635 let cli = Cli::try_parse_from([
1636 "pulpo",
1637 "secret",
1638 "set",
1639 "GH_WORK",
1640 "token123",
1641 "--env",
1642 "GITHUB_TOKEN",
1643 ])
1644 .unwrap();
1645 assert!(matches!(
1646 &cli.command,
1647 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1648 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1649 ));
1650 }
1651
1652 #[test]
1653 fn test_cli_parse_secret_set_without_env() {
1654 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1655 assert!(matches!(
1656 &cli.command,
1657 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1658 if name == "MY_KEY" && value == "val" && env.is_none()
1659 ));
1660 }
1661
1662 #[test]
1663 fn test_cli_parse_spawn_worktree() {
1664 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1665 assert!(matches!(
1666 &cli.command,
1667 Some(Commands::Spawn { worktree, .. }) if *worktree
1668 ));
1669 }
1670
1671 #[test]
1672 fn test_cli_parse_spawn_detach() {
1673 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1674 assert!(matches!(
1675 &cli.command,
1676 Some(Commands::Spawn { detach, .. }) if *detach
1677 ));
1678 }
1679
1680 #[test]
1681 fn test_cli_parse_spawn_detach_short() {
1682 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1683 assert!(matches!(
1684 &cli.command,
1685 Some(Commands::Spawn { detach, .. }) if *detach
1686 ));
1687 }
1688
1689 #[test]
1690 fn test_cli_parse_spawn_detach_default() {
1691 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1692 assert!(matches!(
1693 &cli.command,
1694 Some(Commands::Spawn { detach, .. }) if !detach
1695 ));
1696 }
1697
1698 #[test]
1699 fn test_cli_parse_logs() {
1700 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1701 assert!(matches!(
1702 &cli.command,
1703 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1704 ));
1705 }
1706
1707 #[test]
1708 fn test_cli_parse_logs_with_lines() {
1709 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1710 assert!(matches!(
1711 &cli.command,
1712 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1713 ));
1714 }
1715
1716 #[test]
1717 fn test_cli_parse_logs_follow() {
1718 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1719 assert!(matches!(
1720 &cli.command,
1721 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1722 ));
1723 }
1724
1725 #[test]
1726 fn test_cli_parse_logs_follow_short() {
1727 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1728 assert!(matches!(
1729 &cli.command,
1730 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1731 ));
1732 }
1733
1734 #[test]
1735 fn test_cli_parse_kill() {
1736 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1737 assert!(matches!(
1738 &cli.command,
1739 Some(Commands::Kill { names }) if names == &["my-session"]
1740 ));
1741 }
1742
1743 #[test]
1744 fn test_cli_parse_delete() {
1745 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1746 assert!(matches!(
1747 &cli.command,
1748 Some(Commands::Delete { names }) if names == &["my-session"]
1749 ));
1750 }
1751
1752 #[test]
1753 fn test_cli_parse_resume() {
1754 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1755 assert!(matches!(
1756 &cli.command,
1757 Some(Commands::Resume { name }) if name == "my-session"
1758 ));
1759 }
1760
1761 #[test]
1762 fn test_cli_parse_input() {
1763 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1764 assert!(matches!(
1765 &cli.command,
1766 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1767 ));
1768 }
1769
1770 #[test]
1771 fn test_cli_parse_input_no_text() {
1772 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1773 assert!(matches!(
1774 &cli.command,
1775 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1776 ));
1777 }
1778
1779 #[test]
1780 fn test_cli_parse_input_alias() {
1781 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1782 assert!(matches!(
1783 &cli.command,
1784 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1785 ));
1786 }
1787
1788 #[test]
1789 fn test_cli_parse_custom_node() {
1790 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1791 assert_eq!(cli.node, "win-pc:8080");
1792 assert_eq!(cli.path.as_deref(), Some("list"));
1794 }
1795
1796 #[test]
1797 fn test_cli_version() {
1798 let result = Cli::try_parse_from(["pulpo", "--version"]);
1799 let err = result.unwrap_err();
1801 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1802 }
1803
1804 #[test]
1805 fn test_cli_parse_no_subcommand_succeeds() {
1806 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1807 assert!(cli.command.is_none());
1808 assert!(cli.path.is_none());
1809 }
1810
1811 #[test]
1812 fn test_cli_debug() {
1813 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1814 let debug = format!("{cli:?}");
1815 assert!(debug.contains("List"));
1816 }
1817
1818 #[test]
1819 fn test_commands_debug() {
1820 let cmd = Commands::List { all: false };
1821 assert_eq!(format!("{cmd:?}"), "List { all: false }");
1822 }
1823
1824 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1826
1827 fn test_create_response_json() -> String {
1829 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1830 }
1831
1832 async fn start_test_server() -> String {
1834 use axum::http::StatusCode;
1835 use axum::{
1836 Json, Router,
1837 routing::{get, post},
1838 };
1839
1840 let create_json = test_create_response_json();
1841
1842 let app = Router::new()
1843 .route(
1844 "/api/v1/sessions",
1845 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1846 (StatusCode::CREATED, create_json.clone())
1847 }),
1848 )
1849 .route(
1850 "/api/v1/sessions/{id}",
1851 get(|| async { TEST_SESSION_JSON.to_owned() })
1852 .delete(|| async { StatusCode::NO_CONTENT }),
1853 )
1854 .route(
1855 "/api/v1/sessions/{id}/kill",
1856 post(|| async { StatusCode::NO_CONTENT }),
1857 )
1858 .route(
1859 "/api/v1/sessions/{id}/output",
1860 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1861 )
1862 .route(
1863 "/api/v1/peers",
1864 get(|| async {
1865 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1866 }),
1867 )
1868 .route(
1869 "/api/v1/sessions/{id}/resume",
1870 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1871 )
1872 .route(
1873 "/api/v1/sessions/{id}/interventions",
1874 get(|| async { "[]".to_owned() }),
1875 )
1876 .route(
1877 "/api/v1/sessions/{id}/input",
1878 post(|| async { StatusCode::NO_CONTENT }),
1879 )
1880 .route(
1881 "/api/v1/schedules",
1882 get(|| async { Json::<Vec<()>>(vec![]) })
1883 .post(|| async { StatusCode::CREATED }),
1884 )
1885 .route(
1886 "/api/v1/schedules/{id}",
1887 axum::routing::put(|| async { StatusCode::OK })
1888 .delete(|| async { StatusCode::NO_CONTENT }),
1889 );
1890
1891 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1892 let addr = listener.local_addr().unwrap();
1893 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1894 format!("127.0.0.1:{}", addr.port())
1895 }
1896
1897 #[tokio::test]
1898 async fn test_execute_list_success() {
1899 let node = start_test_server().await;
1900 let cli = Cli {
1901 node,
1902 token: None,
1903 command: Some(Commands::List { all: false }),
1904 path: None,
1905 };
1906 let result = execute(&cli).await.unwrap();
1907 assert_eq!(result, "No sessions.");
1908 }
1909
1910 #[tokio::test]
1911 async fn test_execute_nodes_success() {
1912 let node = start_test_server().await;
1913 let cli = Cli {
1914 node,
1915 token: None,
1916 command: Some(Commands::Nodes),
1917 path: None,
1918 };
1919 let result = execute(&cli).await.unwrap();
1920 assert!(result.contains("test"));
1921 assert!(result.contains("(local)"));
1922 assert!(result.contains("NAME"));
1923 }
1924
1925 #[tokio::test]
1926 async fn test_execute_spawn_success() {
1927 let node = start_test_server().await;
1928 let cli = Cli {
1929 node,
1930 token: None,
1931 command: Some(Commands::Spawn {
1932 name: Some("test".into()),
1933 workdir: Some("/tmp/repo".into()),
1934 ink: None,
1935 description: None,
1936 detach: true,
1937 idle_threshold: None,
1938 auto: false,
1939 worktree: false,
1940 runtime: None,
1941 secret: vec![],
1942 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1943 }),
1944 path: None,
1945 };
1946 let result = execute(&cli).await.unwrap();
1947 assert!(result.contains("Created session"));
1948 assert!(result.contains("repo"));
1949 }
1950
1951 #[tokio::test]
1952 async fn test_execute_spawn_with_all_flags() {
1953 let node = start_test_server().await;
1954 let cli = Cli {
1955 node,
1956 token: None,
1957 command: Some(Commands::Spawn {
1958 name: Some("test".into()),
1959 workdir: Some("/tmp/repo".into()),
1960 ink: Some("coder".into()),
1961 description: Some("Fix the bug".into()),
1962 detach: true,
1963 idle_threshold: None,
1964 auto: false,
1965 worktree: false,
1966 runtime: None,
1967 secret: vec![],
1968 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1969 }),
1970 path: None,
1971 };
1972 let result = execute(&cli).await.unwrap();
1973 assert!(result.contains("Created session"));
1974 }
1975
1976 #[tokio::test]
1977 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1978 let node = start_test_server().await;
1979 let cli = Cli {
1980 node,
1981 token: None,
1982 command: Some(Commands::Spawn {
1983 name: Some("full-opts".into()),
1984 workdir: Some("/tmp/repo".into()),
1985 ink: Some("coder".into()),
1986 description: Some("Full options".into()),
1987 detach: true,
1988 idle_threshold: Some(120),
1989 auto: false,
1990 worktree: true,
1991 runtime: Some("docker".into()),
1992 secret: vec![],
1993 command: vec!["claude".into()],
1994 }),
1995 path: None,
1996 };
1997 let result = execute(&cli).await.unwrap();
1998 assert!(result.contains("Created session"));
1999 }
2000
2001 #[tokio::test]
2002 async fn test_execute_spawn_no_name_derives_from_workdir() {
2003 let node = start_test_server().await;
2004 let cli = Cli {
2005 node,
2006 token: None,
2007 command: Some(Commands::Spawn {
2008 name: None,
2009 workdir: Some("/tmp/my-project".into()),
2010 ink: None,
2011 description: None,
2012 detach: true,
2013 idle_threshold: None,
2014 auto: false,
2015 worktree: false,
2016 runtime: None,
2017 secret: vec![],
2018 command: vec!["echo".into(), "hello".into()],
2019 }),
2020 path: None,
2021 };
2022 let result = execute(&cli).await.unwrap();
2023 assert!(result.contains("Created session"));
2024 }
2025
2026 #[tokio::test]
2027 async fn test_execute_spawn_no_command() {
2028 let node = start_test_server().await;
2029 let cli = Cli {
2030 node,
2031 token: None,
2032 command: Some(Commands::Spawn {
2033 name: Some("test".into()),
2034 workdir: Some("/tmp/repo".into()),
2035 ink: None,
2036 description: None,
2037 detach: true,
2038 idle_threshold: None,
2039 auto: false,
2040 worktree: false,
2041 runtime: None,
2042 secret: vec![],
2043 command: vec![],
2044 }),
2045 path: None,
2046 };
2047 let result = execute(&cli).await.unwrap();
2048 assert!(result.contains("Created session"));
2049 }
2050
2051 #[tokio::test]
2052 async fn test_execute_spawn_with_name() {
2053 let node = start_test_server().await;
2054 let cli = Cli {
2055 node,
2056 token: None,
2057 command: Some(Commands::Spawn {
2058 name: Some("my-task".into()),
2059 workdir: Some("/tmp/repo".into()),
2060 ink: None,
2061 description: None,
2062 detach: true,
2063 idle_threshold: None,
2064 auto: false,
2065 worktree: false,
2066 runtime: None,
2067 secret: vec![],
2068 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2069 }),
2070 path: None,
2071 };
2072 let result = execute(&cli).await.unwrap();
2073 assert!(result.contains("Created session"));
2074 }
2075
2076 #[tokio::test]
2077 async fn test_execute_spawn_auto_attach() {
2078 let node = start_test_server().await;
2079 let cli = Cli {
2080 node,
2081 token: None,
2082 command: Some(Commands::Spawn {
2083 name: Some("test".into()),
2084 workdir: Some("/tmp/repo".into()),
2085 ink: None,
2086 description: None,
2087 detach: false,
2088 idle_threshold: None,
2089 auto: false,
2090 worktree: false,
2091 runtime: None,
2092 secret: vec![],
2093 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2094 }),
2095 path: None,
2096 };
2097 let result = execute(&cli).await.unwrap();
2098 assert!(result.contains("Detached from session"));
2100 }
2101
2102 #[tokio::test]
2103 async fn test_execute_kill_success() {
2104 let node = start_test_server().await;
2105 let cli = Cli {
2106 node,
2107 token: None,
2108 command: Some(Commands::Kill {
2109 names: vec!["test-session".into()],
2110 }),
2111 path: None,
2112 };
2113 let result = execute(&cli).await.unwrap();
2114 assert!(result.contains("killed"));
2115 }
2116
2117 #[tokio::test]
2118 async fn test_execute_delete_success() {
2119 let node = start_test_server().await;
2120 let cli = Cli {
2121 node,
2122 token: None,
2123 command: Some(Commands::Delete {
2124 names: vec!["test-session".into()],
2125 }),
2126 path: None,
2127 };
2128 let result = execute(&cli).await.unwrap();
2129 assert!(result.contains("deleted"));
2130 }
2131
2132 #[tokio::test]
2133 async fn test_execute_logs_success() {
2134 let node = start_test_server().await;
2135 let cli = Cli {
2136 node,
2137 token: None,
2138 command: Some(Commands::Logs {
2139 name: "test-session".into(),
2140 lines: 50,
2141 follow: false,
2142 }),
2143 path: None,
2144 };
2145 let result = execute(&cli).await.unwrap();
2146 assert!(result.contains("test output"));
2147 }
2148
2149 #[tokio::test]
2150 async fn test_execute_list_connection_refused() {
2151 let cli = Cli {
2152 node: "localhost:1".into(),
2153 token: None,
2154 command: Some(Commands::List { all: false }),
2155 path: None,
2156 };
2157 let result = execute(&cli).await;
2158 let err = result.unwrap_err().to_string();
2159 assert!(
2160 err.contains("Could not connect to pulpod"),
2161 "Expected friendly error, got: {err}"
2162 );
2163 assert!(err.contains("localhost:1"));
2164 }
2165
2166 #[tokio::test]
2167 async fn test_execute_nodes_connection_refused() {
2168 let cli = Cli {
2169 node: "localhost:1".into(),
2170 token: None,
2171 command: Some(Commands::Nodes),
2172 path: None,
2173 };
2174 let result = execute(&cli).await;
2175 let err = result.unwrap_err().to_string();
2176 assert!(err.contains("Could not connect to pulpod"));
2177 }
2178
2179 #[tokio::test]
2180 async fn test_execute_kill_error_response() {
2181 use axum::{Router, http::StatusCode, routing::post};
2182
2183 let app = Router::new().route(
2184 "/api/v1/sessions/{id}/kill",
2185 post(|| async {
2186 (
2187 StatusCode::NOT_FOUND,
2188 "{\"error\":\"session not found: test-session\"}",
2189 )
2190 }),
2191 );
2192 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2193 let addr = listener.local_addr().unwrap();
2194 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2195 let node = format!("127.0.0.1:{}", addr.port());
2196
2197 let cli = Cli {
2198 node,
2199 token: None,
2200 command: Some(Commands::Kill {
2201 names: vec!["test-session".into()],
2202 }),
2203 path: None,
2204 };
2205 let result = execute(&cli).await.unwrap();
2206 assert!(result.contains("Error killing test-session"), "{result}");
2207 }
2208
2209 #[tokio::test]
2210 async fn test_execute_delete_error_response() {
2211 use axum::{Router, http::StatusCode, routing::delete};
2212
2213 let app = Router::new().route(
2214 "/api/v1/sessions/{id}",
2215 delete(|| async {
2216 (
2217 StatusCode::CONFLICT,
2218 "{\"error\":\"cannot delete session in 'running' state\"}",
2219 )
2220 }),
2221 );
2222 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2223 let addr = listener.local_addr().unwrap();
2224 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2225 let node = format!("127.0.0.1:{}", addr.port());
2226
2227 let cli = Cli {
2228 node,
2229 token: None,
2230 command: Some(Commands::Delete {
2231 names: vec!["test-session".into()],
2232 }),
2233 path: None,
2234 };
2235 let result = execute(&cli).await.unwrap();
2236 assert!(result.contains("Error deleting test-session"), "{result}");
2237 }
2238
2239 #[tokio::test]
2240 async fn test_execute_logs_error_response() {
2241 use axum::{Router, http::StatusCode, routing::get};
2242
2243 let app = Router::new().route(
2244 "/api/v1/sessions/{id}/output",
2245 get(|| async {
2246 (
2247 StatusCode::NOT_FOUND,
2248 "{\"error\":\"session not found: ghost\"}",
2249 )
2250 }),
2251 );
2252 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2253 let addr = listener.local_addr().unwrap();
2254 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2255 let node = format!("127.0.0.1:{}", addr.port());
2256
2257 let cli = Cli {
2258 node,
2259 token: None,
2260 command: Some(Commands::Logs {
2261 name: "ghost".into(),
2262 lines: 50,
2263 follow: false,
2264 }),
2265 path: None,
2266 };
2267 let err = execute(&cli).await.unwrap_err();
2268 assert_eq!(err.to_string(), "session not found: ghost");
2269 }
2270
2271 #[tokio::test]
2272 async fn test_execute_resume_error_response() {
2273 use axum::{Router, http::StatusCode, routing::post};
2274
2275 let app = Router::new().route(
2276 "/api/v1/sessions/{id}/resume",
2277 post(|| async {
2278 (
2279 StatusCode::BAD_REQUEST,
2280 "{\"error\":\"session is not lost (status: active)\"}",
2281 )
2282 }),
2283 );
2284 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2285 let addr = listener.local_addr().unwrap();
2286 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2287 let node = format!("127.0.0.1:{}", addr.port());
2288
2289 let cli = Cli {
2290 node,
2291 token: None,
2292 command: Some(Commands::Resume {
2293 name: "test-session".into(),
2294 }),
2295 path: None,
2296 };
2297 let err = execute(&cli).await.unwrap_err();
2298 assert_eq!(err.to_string(), "session is not lost (status: active)");
2299 }
2300
2301 #[tokio::test]
2302 async fn test_execute_spawn_error_response() {
2303 use axum::{Router, http::StatusCode, routing::post};
2304
2305 let app = Router::new().route(
2306 "/api/v1/sessions",
2307 post(|| async {
2308 (
2309 StatusCode::INTERNAL_SERVER_ERROR,
2310 "{\"error\":\"failed to spawn session\"}",
2311 )
2312 }),
2313 );
2314 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2315 let addr = listener.local_addr().unwrap();
2316 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2317 let node = format!("127.0.0.1:{}", addr.port());
2318
2319 let cli = Cli {
2320 node,
2321 token: None,
2322 command: Some(Commands::Spawn {
2323 name: Some("test".into()),
2324 workdir: Some("/tmp/repo".into()),
2325 ink: None,
2326 description: None,
2327 detach: true,
2328 idle_threshold: None,
2329 auto: false,
2330 worktree: false,
2331 runtime: None,
2332 secret: vec![],
2333 command: vec!["test".into()],
2334 }),
2335 path: None,
2336 };
2337 let err = execute(&cli).await.unwrap_err();
2338 assert_eq!(err.to_string(), "failed to spawn session");
2339 }
2340
2341 #[tokio::test]
2342 async fn test_execute_interventions_error_response() {
2343 use axum::{Router, http::StatusCode, routing::get};
2344
2345 let app = Router::new().route(
2346 "/api/v1/sessions/{id}/interventions",
2347 get(|| async {
2348 (
2349 StatusCode::NOT_FOUND,
2350 "{\"error\":\"session not found: ghost\"}",
2351 )
2352 }),
2353 );
2354 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2355 let addr = listener.local_addr().unwrap();
2356 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2357 let node = format!("127.0.0.1:{}", addr.port());
2358
2359 let cli = Cli {
2360 node,
2361 token: None,
2362 command: Some(Commands::Interventions {
2363 name: "ghost".into(),
2364 }),
2365 path: None,
2366 };
2367 let err = execute(&cli).await.unwrap_err();
2368 assert_eq!(err.to_string(), "session not found: ghost");
2369 }
2370
2371 #[tokio::test]
2372 async fn test_execute_resume_success() {
2373 let node = start_test_server().await;
2374 let cli = Cli {
2375 node,
2376 token: None,
2377 command: Some(Commands::Resume {
2378 name: "test-session".into(),
2379 }),
2380 path: None,
2381 };
2382 let result = execute(&cli).await.unwrap();
2383 assert!(result.contains("Detached from session"));
2384 }
2385
2386 #[tokio::test]
2387 async fn test_execute_input_success() {
2388 let node = start_test_server().await;
2389 let cli = Cli {
2390 node,
2391 token: None,
2392 command: Some(Commands::Input {
2393 name: "test-session".into(),
2394 text: Some("yes".into()),
2395 }),
2396 path: None,
2397 };
2398 let result = execute(&cli).await.unwrap();
2399 assert!(result.contains("Sent input to session test-session"));
2400 }
2401
2402 #[tokio::test]
2403 async fn test_execute_input_no_text() {
2404 let node = start_test_server().await;
2405 let cli = Cli {
2406 node,
2407 token: None,
2408 command: Some(Commands::Input {
2409 name: "test-session".into(),
2410 text: None,
2411 }),
2412 path: None,
2413 };
2414 let result = execute(&cli).await.unwrap();
2415 assert!(result.contains("Sent input to session test-session"));
2416 }
2417
2418 #[tokio::test]
2419 async fn test_execute_input_connection_refused() {
2420 let cli = Cli {
2421 node: "localhost:1".into(),
2422 token: None,
2423 command: Some(Commands::Input {
2424 name: "test".into(),
2425 text: Some("y".into()),
2426 }),
2427 path: None,
2428 };
2429 let result = execute(&cli).await;
2430 let err = result.unwrap_err().to_string();
2431 assert!(err.contains("Could not connect to pulpod"));
2432 }
2433
2434 #[tokio::test]
2435 async fn test_execute_input_error_response() {
2436 use axum::{Router, http::StatusCode, routing::post};
2437
2438 let app = Router::new().route(
2439 "/api/v1/sessions/{id}/input",
2440 post(|| async {
2441 (
2442 StatusCode::NOT_FOUND,
2443 "{\"error\":\"session not found: ghost\"}",
2444 )
2445 }),
2446 );
2447 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2448 let addr = listener.local_addr().unwrap();
2449 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2450 let node = format!("127.0.0.1:{}", addr.port());
2451
2452 let cli = Cli {
2453 node,
2454 token: None,
2455 command: Some(Commands::Input {
2456 name: "ghost".into(),
2457 text: Some("y".into()),
2458 }),
2459 path: None,
2460 };
2461 let err = execute(&cli).await.unwrap_err();
2462 assert_eq!(err.to_string(), "session not found: ghost");
2463 }
2464
2465 #[tokio::test]
2466 async fn test_execute_ui() {
2467 let cli = Cli {
2468 node: "localhost:7433".into(),
2469 token: None,
2470 command: Some(Commands::Ui),
2471 path: None,
2472 };
2473 let result = execute(&cli).await.unwrap();
2474 assert!(result.contains("Opening"));
2475 assert!(result.contains("http://localhost:7433"));
2476 }
2477
2478 #[tokio::test]
2479 async fn test_execute_ui_custom_node() {
2480 let cli = Cli {
2481 node: "mac-mini:7433".into(),
2482 token: None,
2483 command: Some(Commands::Ui),
2484 path: None,
2485 };
2486 let result = execute(&cli).await.unwrap();
2487 assert!(result.contains("http://mac-mini:7433"));
2488 }
2489
2490 #[test]
2491 fn test_format_sessions_empty() {
2492 assert_eq!(format_sessions(&[]), "No sessions.");
2493 }
2494
2495 #[test]
2496 fn test_format_sessions_with_data() {
2497 use chrono::Utc;
2498 use pulpo_common::session::SessionStatus;
2499 use uuid::Uuid;
2500
2501 let sessions = vec![Session {
2502 id: Uuid::nil(),
2503 name: "my-api".into(),
2504 workdir: "/tmp/repo".into(),
2505 command: "claude -p 'Fix the bug'".into(),
2506 description: Some("Fix the bug".into()),
2507 status: SessionStatus::Active,
2508 exit_code: None,
2509 backend_session_id: None,
2510 output_snapshot: None,
2511 metadata: None,
2512 ink: None,
2513 intervention_code: None,
2514 intervention_reason: None,
2515 intervention_at: None,
2516 last_output_at: None,
2517 idle_since: None,
2518 idle_threshold_secs: None,
2519 worktree_path: None,
2520 runtime: Runtime::Tmux,
2521 created_at: Utc::now(),
2522 updated_at: Utc::now(),
2523 }];
2524 let output = format_sessions(&sessions);
2525 assert!(output.contains("ID"));
2526 assert!(output.contains("NAME"));
2527 assert!(output.contains("RUNTIME"));
2528 assert!(output.contains("COMMAND"));
2529 assert!(output.contains("00000000"));
2530 assert!(output.contains("my-api"));
2531 assert!(output.contains("active"));
2532 assert!(output.contains("tmux"));
2533 assert!(output.contains("claude -p 'Fix the bug'"));
2534 }
2535
2536 #[test]
2537 fn test_format_sessions_docker_runtime() {
2538 use chrono::Utc;
2539 use pulpo_common::session::SessionStatus;
2540 use uuid::Uuid;
2541
2542 let sessions = vec![Session {
2543 id: Uuid::nil(),
2544 name: "sandbox-test".into(),
2545 workdir: "/tmp".into(),
2546 command: "claude".into(),
2547 description: None,
2548 status: SessionStatus::Active,
2549 exit_code: None,
2550 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2551 output_snapshot: None,
2552 metadata: None,
2553 ink: None,
2554 intervention_code: None,
2555 intervention_reason: None,
2556 intervention_at: None,
2557 last_output_at: None,
2558 idle_since: None,
2559 idle_threshold_secs: None,
2560 worktree_path: None,
2561 runtime: Runtime::Docker,
2562 created_at: Utc::now(),
2563 updated_at: Utc::now(),
2564 }];
2565 let output = format_sessions(&sessions);
2566 assert!(output.contains("docker"));
2567 }
2568
2569 #[test]
2570 fn test_format_sessions_long_command_truncated() {
2571 use chrono::Utc;
2572 use pulpo_common::session::SessionStatus;
2573 use uuid::Uuid;
2574
2575 let sessions = vec![Session {
2576 id: Uuid::nil(),
2577 name: "test".into(),
2578 workdir: "/tmp".into(),
2579 command:
2580 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2581 .into(),
2582 description: None,
2583 status: SessionStatus::Ready,
2584 exit_code: None,
2585 backend_session_id: None,
2586 output_snapshot: None,
2587 metadata: None,
2588 ink: None,
2589 intervention_code: None,
2590 intervention_reason: None,
2591 intervention_at: None,
2592 last_output_at: None,
2593 idle_since: None,
2594 idle_threshold_secs: None,
2595 worktree_path: None,
2596 runtime: Runtime::Tmux,
2597 created_at: Utc::now(),
2598 updated_at: Utc::now(),
2599 }];
2600 let output = format_sessions(&sessions);
2601 assert!(output.contains("..."));
2602 }
2603
2604 #[test]
2605 fn test_format_sessions_worktree_indicator() {
2606 use chrono::Utc;
2607 use pulpo_common::session::SessionStatus;
2608 use uuid::Uuid;
2609
2610 let sessions = vec![Session {
2611 id: Uuid::nil(),
2612 name: "wt-task".into(),
2613 workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2614 command: "claude".into(),
2615 description: None,
2616 status: SessionStatus::Active,
2617 exit_code: None,
2618 backend_session_id: None,
2619 output_snapshot: None,
2620 metadata: None,
2621 ink: None,
2622 intervention_code: None,
2623 intervention_reason: None,
2624 intervention_at: None,
2625 last_output_at: None,
2626 idle_since: None,
2627 idle_threshold_secs: None,
2628 worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2629 runtime: Runtime::Tmux,
2630 created_at: Utc::now(),
2631 updated_at: Utc::now(),
2632 }];
2633 let output = format_sessions(&sessions);
2634 assert!(
2635 output.contains("[wt]"),
2636 "should show worktree indicator: {output}"
2637 );
2638 assert!(output.contains("wt-task [wt]"));
2639 }
2640
2641 #[test]
2642 fn test_format_sessions_pr_indicator() {
2643 use chrono::Utc;
2644 use pulpo_common::session::SessionStatus;
2645 use std::collections::HashMap;
2646 use uuid::Uuid;
2647
2648 let mut meta = HashMap::new();
2649 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2650 let sessions = vec![Session {
2651 id: Uuid::nil(),
2652 name: "pr-task".into(),
2653 workdir: "/tmp".into(),
2654 command: "claude".into(),
2655 description: None,
2656 status: SessionStatus::Active,
2657 exit_code: None,
2658 backend_session_id: None,
2659 output_snapshot: None,
2660 metadata: Some(meta),
2661 ink: None,
2662 intervention_code: None,
2663 intervention_reason: None,
2664 intervention_at: None,
2665 last_output_at: None,
2666 idle_since: None,
2667 idle_threshold_secs: None,
2668 worktree_path: None,
2669 runtime: Runtime::Tmux,
2670 created_at: Utc::now(),
2671 updated_at: Utc::now(),
2672 }];
2673 let output = format_sessions(&sessions);
2674 assert!(
2675 output.contains("[PR]"),
2676 "should show PR indicator: {output}"
2677 );
2678 assert!(output.contains("pr-task [PR]"));
2679 }
2680
2681 #[test]
2682 fn test_format_sessions_worktree_and_pr_indicator() {
2683 use chrono::Utc;
2684 use pulpo_common::session::SessionStatus;
2685 use std::collections::HashMap;
2686 use uuid::Uuid;
2687
2688 let mut meta = HashMap::new();
2689 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2690 let sessions = vec![Session {
2691 id: Uuid::nil(),
2692 name: "both-task".into(),
2693 workdir: "/tmp".into(),
2694 command: "claude".into(),
2695 description: None,
2696 status: SessionStatus::Active,
2697 exit_code: None,
2698 backend_session_id: None,
2699 output_snapshot: None,
2700 metadata: Some(meta),
2701 ink: None,
2702 intervention_code: None,
2703 intervention_reason: None,
2704 intervention_at: None,
2705 last_output_at: None,
2706 idle_since: None,
2707 idle_threshold_secs: None,
2708 worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2709 runtime: Runtime::Tmux,
2710 created_at: Utc::now(),
2711 updated_at: Utc::now(),
2712 }];
2713 let output = format_sessions(&sessions);
2714 assert!(
2715 output.contains("[wt] [PR]"),
2716 "should show both indicators: {output}"
2717 );
2718 }
2719
2720 #[test]
2721 fn test_format_sessions_no_pr_without_metadata() {
2722 use chrono::Utc;
2723 use pulpo_common::session::SessionStatus;
2724 use uuid::Uuid;
2725
2726 let sessions = vec![Session {
2727 id: Uuid::nil(),
2728 name: "no-pr".into(),
2729 workdir: "/tmp".into(),
2730 command: "claude".into(),
2731 description: None,
2732 status: SessionStatus::Active,
2733 exit_code: None,
2734 backend_session_id: None,
2735 output_snapshot: None,
2736 metadata: None,
2737 ink: None,
2738 intervention_code: None,
2739 intervention_reason: None,
2740 intervention_at: None,
2741 last_output_at: None,
2742 idle_since: None,
2743 idle_threshold_secs: None,
2744 worktree_path: None,
2745 runtime: Runtime::Tmux,
2746 created_at: Utc::now(),
2747 updated_at: Utc::now(),
2748 }];
2749 let output = format_sessions(&sessions);
2750 assert!(
2751 !output.contains("[PR]"),
2752 "should not show PR indicator: {output}"
2753 );
2754 }
2755
2756 #[test]
2757 fn test_format_nodes() {
2758 use pulpo_common::node::NodeInfo;
2759 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2760
2761 let resp = PeersResponse {
2762 local: NodeInfo {
2763 name: "mac-mini".into(),
2764 hostname: "h".into(),
2765 os: "macos".into(),
2766 arch: "arm64".into(),
2767 cpus: 8,
2768 memory_mb: 16384,
2769 gpu: None,
2770 },
2771 peers: vec![PeerInfo {
2772 name: "win-pc".into(),
2773 address: "win-pc:7433".into(),
2774 status: PeerStatus::Online,
2775 node_info: None,
2776 session_count: Some(3),
2777 source: PeerSource::Configured,
2778 }],
2779 };
2780 let output = format_nodes(&resp);
2781 assert!(output.contains("mac-mini"));
2782 assert!(output.contains("(local)"));
2783 assert!(output.contains("win-pc"));
2784 assert!(output.contains('3'));
2785 }
2786
2787 #[test]
2788 fn test_format_nodes_no_session_count() {
2789 use pulpo_common::node::NodeInfo;
2790 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2791
2792 let resp = PeersResponse {
2793 local: NodeInfo {
2794 name: "local".into(),
2795 hostname: "h".into(),
2796 os: "linux".into(),
2797 arch: "x86_64".into(),
2798 cpus: 4,
2799 memory_mb: 8192,
2800 gpu: None,
2801 },
2802 peers: vec![PeerInfo {
2803 name: "peer".into(),
2804 address: "peer:7433".into(),
2805 status: PeerStatus::Offline,
2806 node_info: None,
2807 session_count: None,
2808 source: PeerSource::Configured,
2809 }],
2810 };
2811 let output = format_nodes(&resp);
2812 assert!(output.contains("offline"));
2813 let lines: Vec<&str> = output.lines().collect();
2815 assert!(lines[2].contains('-'));
2816 }
2817
2818 #[tokio::test]
2819 async fn test_execute_resume_connection_refused() {
2820 let cli = Cli {
2821 node: "localhost:1".into(),
2822 token: None,
2823 command: Some(Commands::Resume {
2824 name: "test".into(),
2825 }),
2826 path: None,
2827 };
2828 let result = execute(&cli).await;
2829 let err = result.unwrap_err().to_string();
2830 assert!(err.contains("Could not connect to pulpod"));
2831 }
2832
2833 #[tokio::test]
2834 async fn test_execute_spawn_connection_refused() {
2835 let cli = Cli {
2836 node: "localhost:1".into(),
2837 token: None,
2838 command: Some(Commands::Spawn {
2839 name: Some("test".into()),
2840 workdir: Some("/tmp".into()),
2841 ink: None,
2842 description: None,
2843 detach: true,
2844 idle_threshold: None,
2845 auto: false,
2846 worktree: false,
2847 runtime: None,
2848 secret: vec![],
2849 command: vec!["test".into()],
2850 }),
2851 path: None,
2852 };
2853 let result = execute(&cli).await;
2854 let err = result.unwrap_err().to_string();
2855 assert!(err.contains("Could not connect to pulpod"));
2856 }
2857
2858 #[tokio::test]
2859 async fn test_execute_kill_connection_refused() {
2860 let cli = Cli {
2861 node: "localhost:1".into(),
2862 token: None,
2863 command: Some(Commands::Kill {
2864 names: vec!["test".into()],
2865 }),
2866 path: None,
2867 };
2868 let result = execute(&cli).await;
2869 let err = result.unwrap_err().to_string();
2870 assert!(err.contains("Could not connect to pulpod"));
2871 }
2872
2873 #[tokio::test]
2874 async fn test_execute_delete_connection_refused() {
2875 let cli = Cli {
2876 node: "localhost:1".into(),
2877 token: None,
2878 command: Some(Commands::Delete {
2879 names: vec!["test".into()],
2880 }),
2881 path: None,
2882 };
2883 let result = execute(&cli).await;
2884 let err = result.unwrap_err().to_string();
2885 assert!(err.contains("Could not connect to pulpod"));
2886 }
2887
2888 #[tokio::test]
2889 async fn test_execute_logs_connection_refused() {
2890 let cli = Cli {
2891 node: "localhost:1".into(),
2892 token: None,
2893 command: Some(Commands::Logs {
2894 name: "test".into(),
2895 lines: 50,
2896 follow: false,
2897 }),
2898 path: None,
2899 };
2900 let result = execute(&cli).await;
2901 let err = result.unwrap_err().to_string();
2902 assert!(err.contains("Could not connect to pulpod"));
2903 }
2904
2905 #[tokio::test]
2906 async fn test_friendly_error_connect() {
2907 let err = reqwest::Client::new()
2909 .get("http://127.0.0.1:1")
2910 .send()
2911 .await
2912 .unwrap_err();
2913 let friendly = friendly_error(&err, "test-node:1");
2914 let msg = friendly.to_string();
2915 assert!(
2916 msg.contains("Could not connect"),
2917 "Expected connect message, got: {msg}"
2918 );
2919 }
2920
2921 #[tokio::test]
2922 async fn test_friendly_error_other() {
2923 let err = reqwest::Client::new()
2925 .get("http://[::invalid::url")
2926 .send()
2927 .await
2928 .unwrap_err();
2929 let friendly = friendly_error(&err, "bad-host");
2930 let msg = friendly.to_string();
2931 assert!(
2932 msg.contains("Network error"),
2933 "Expected network error message, got: {msg}"
2934 );
2935 assert!(msg.contains("bad-host"));
2936 }
2937
2938 #[test]
2941 fn test_is_localhost_variants() {
2942 assert!(is_localhost("localhost:7433"));
2943 assert!(is_localhost("127.0.0.1:7433"));
2944 assert!(is_localhost("[::1]:7433"));
2945 assert!(is_localhost("::1"));
2946 assert!(is_localhost("localhost"));
2947 assert!(!is_localhost("mac-mini:7433"));
2948 assert!(!is_localhost("192.168.1.100:7433"));
2949 }
2950
2951 #[test]
2952 fn test_authed_get_with_token() {
2953 let client = reqwest::Client::new();
2954 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2955 .build()
2956 .unwrap();
2957 let auth = req
2958 .headers()
2959 .get("authorization")
2960 .unwrap()
2961 .to_str()
2962 .unwrap();
2963 assert_eq!(auth, "Bearer tok");
2964 }
2965
2966 #[test]
2967 fn test_authed_get_without_token() {
2968 let client = reqwest::Client::new();
2969 let req = authed_get(&client, "http://h:1/api".into(), None)
2970 .build()
2971 .unwrap();
2972 assert!(req.headers().get("authorization").is_none());
2973 }
2974
2975 #[test]
2976 fn test_authed_post_with_token() {
2977 let client = reqwest::Client::new();
2978 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2979 .build()
2980 .unwrap();
2981 let auth = req
2982 .headers()
2983 .get("authorization")
2984 .unwrap()
2985 .to_str()
2986 .unwrap();
2987 assert_eq!(auth, "Bearer secret");
2988 }
2989
2990 #[test]
2991 fn test_authed_post_without_token() {
2992 let client = reqwest::Client::new();
2993 let req = authed_post(&client, "http://h:1/api".into(), None)
2994 .build()
2995 .unwrap();
2996 assert!(req.headers().get("authorization").is_none());
2997 }
2998
2999 #[test]
3000 fn test_authed_delete_with_token() {
3001 let client = reqwest::Client::new();
3002 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
3003 .build()
3004 .unwrap();
3005 let auth = req
3006 .headers()
3007 .get("authorization")
3008 .unwrap()
3009 .to_str()
3010 .unwrap();
3011 assert_eq!(auth, "Bearer del-tok");
3012 }
3013
3014 #[test]
3015 fn test_authed_delete_without_token() {
3016 let client = reqwest::Client::new();
3017 let req = authed_delete(&client, "http://h:1/api".into(), None)
3018 .build()
3019 .unwrap();
3020 assert!(req.headers().get("authorization").is_none());
3021 }
3022
3023 #[tokio::test]
3024 async fn test_resolve_token_explicit() {
3025 let client = reqwest::Client::new();
3026 let token =
3027 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
3028 assert_eq!(token, Some("my-tok".into()));
3029 }
3030
3031 #[tokio::test]
3032 async fn test_resolve_token_remote_no_explicit() {
3033 let client = reqwest::Client::new();
3034 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3035 assert_eq!(token, None);
3036 }
3037
3038 #[tokio::test]
3039 async fn test_resolve_token_localhost_auto_discover() {
3040 use axum::{Json, Router, routing::get};
3041
3042 let app = Router::new().route(
3043 "/api/v1/auth/token",
3044 get(|| async {
3045 Json(AuthTokenResponse {
3046 token: "discovered".into(),
3047 })
3048 }),
3049 );
3050 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3051 let addr = listener.local_addr().unwrap();
3052 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3053
3054 let node = format!("localhost:{}", addr.port());
3055 let base = base_url(&node);
3056 let client = reqwest::Client::new();
3057 let token = resolve_token(&client, &base, &node, None).await;
3058 assert_eq!(token, Some("discovered".into()));
3059 }
3060
3061 #[tokio::test]
3062 async fn test_discover_token_empty_returns_none() {
3063 use axum::{Json, Router, routing::get};
3064
3065 let app = Router::new().route(
3066 "/api/v1/auth/token",
3067 get(|| async {
3068 Json(AuthTokenResponse {
3069 token: String::new(),
3070 })
3071 }),
3072 );
3073 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3074 let addr = listener.local_addr().unwrap();
3075 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3076
3077 let base = format!("http://127.0.0.1:{}", addr.port());
3078 let client = reqwest::Client::new();
3079 assert_eq!(discover_token(&client, &base).await, None);
3080 }
3081
3082 #[tokio::test]
3083 async fn test_discover_token_unreachable_returns_none() {
3084 let client = reqwest::Client::new();
3085 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3086 }
3087
3088 #[test]
3089 fn test_cli_parse_with_token() {
3090 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3091 assert_eq!(cli.token, Some("my-secret".into()));
3092 }
3093
3094 #[test]
3095 fn test_cli_parse_without_token() {
3096 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3097 assert_eq!(cli.token, None);
3098 }
3099
3100 #[tokio::test]
3101 async fn test_execute_with_explicit_token_sends_header() {
3102 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3103
3104 let app = Router::new().route(
3105 "/api/v1/sessions",
3106 get(|req: Request| async move {
3107 let auth = req
3108 .headers()
3109 .get("authorization")
3110 .and_then(|v| v.to_str().ok())
3111 .unwrap_or("");
3112 assert_eq!(auth, "Bearer test-token");
3113 (StatusCode::OK, "[]".to_owned())
3114 }),
3115 );
3116 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3117 let addr = listener.local_addr().unwrap();
3118 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3119 let node = format!("127.0.0.1:{}", addr.port());
3120
3121 let cli = Cli {
3122 node,
3123 token: Some("test-token".into()),
3124 command: Some(Commands::List { all: false }),
3125 path: None,
3126 };
3127 let result = execute(&cli).await.unwrap();
3128 assert_eq!(result, "No sessions.");
3129 }
3130
3131 #[test]
3134 fn test_cli_parse_interventions() {
3135 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3136 assert!(matches!(
3137 &cli.command,
3138 Some(Commands::Interventions { name }) if name == "my-session"
3139 ));
3140 }
3141
3142 #[test]
3143 fn test_format_interventions_empty() {
3144 assert_eq!(format_interventions(&[]), "No intervention events.");
3145 }
3146
3147 #[test]
3148 fn test_format_interventions_with_data() {
3149 let events = vec![
3150 InterventionEventResponse {
3151 id: 1,
3152 session_id: "sess-1".into(),
3153 code: None,
3154 reason: "Memory exceeded threshold".into(),
3155 created_at: "2026-01-01T00:00:00Z".into(),
3156 },
3157 InterventionEventResponse {
3158 id: 2,
3159 session_id: "sess-1".into(),
3160 code: None,
3161 reason: "Idle for 10 minutes".into(),
3162 created_at: "2026-01-02T00:00:00Z".into(),
3163 },
3164 ];
3165 let output = format_interventions(&events);
3166 assert!(output.contains("ID"));
3167 assert!(output.contains("TIMESTAMP"));
3168 assert!(output.contains("REASON"));
3169 assert!(output.contains("Memory exceeded threshold"));
3170 assert!(output.contains("Idle for 10 minutes"));
3171 assert!(output.contains("2026-01-01T00:00:00Z"));
3172 }
3173
3174 #[tokio::test]
3175 async fn test_execute_interventions_empty() {
3176 let node = start_test_server().await;
3177 let cli = Cli {
3178 node,
3179 token: None,
3180 command: Some(Commands::Interventions {
3181 name: "my-session".into(),
3182 }),
3183 path: None,
3184 };
3185 let result = execute(&cli).await.unwrap();
3186 assert_eq!(result, "No intervention events.");
3187 }
3188
3189 #[tokio::test]
3190 async fn test_execute_interventions_with_data() {
3191 use axum::{Router, routing::get};
3192
3193 let app = Router::new().route(
3194 "/api/v1/sessions/{id}/interventions",
3195 get(|| async {
3196 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3197 .to_owned()
3198 }),
3199 );
3200 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3201 let addr = listener.local_addr().unwrap();
3202 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3203 let node = format!("127.0.0.1:{}", addr.port());
3204
3205 let cli = Cli {
3206 node,
3207 token: None,
3208 command: Some(Commands::Interventions {
3209 name: "test".into(),
3210 }),
3211 path: None,
3212 };
3213 let result = execute(&cli).await.unwrap();
3214 assert!(result.contains("OOM"));
3215 assert!(result.contains("2026-01-01T00:00:00Z"));
3216 }
3217
3218 #[tokio::test]
3219 async fn test_execute_interventions_connection_refused() {
3220 let cli = Cli {
3221 node: "localhost:1".into(),
3222 token: None,
3223 command: Some(Commands::Interventions {
3224 name: "test".into(),
3225 }),
3226 path: None,
3227 };
3228 let result = execute(&cli).await;
3229 let err = result.unwrap_err().to_string();
3230 assert!(err.contains("Could not connect to pulpod"));
3231 }
3232
3233 #[test]
3236 fn test_build_attach_command_tmux() {
3237 let cmd = build_attach_command("my-session");
3238 assert_eq!(cmd.get_program(), "tmux");
3239 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3240 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3241 }
3242
3243 #[test]
3244 fn test_build_attach_command_docker() {
3245 let cmd = build_attach_command("docker:pulpo-my-task");
3246 assert_eq!(cmd.get_program(), "docker");
3247 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3248 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3249 }
3250
3251 #[test]
3252 fn test_cli_parse_attach() {
3253 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3254 assert!(matches!(
3255 &cli.command,
3256 Some(Commands::Attach { name }) if name == "my-session"
3257 ));
3258 }
3259
3260 #[test]
3261 fn test_cli_parse_attach_alias() {
3262 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3263 assert!(matches!(
3264 &cli.command,
3265 Some(Commands::Attach { name }) if name == "my-session"
3266 ));
3267 }
3268
3269 #[tokio::test]
3270 async fn test_execute_attach_success() {
3271 let node = start_test_server().await;
3272 let cli = Cli {
3273 node,
3274 token: None,
3275 command: Some(Commands::Attach {
3276 name: "test-session".into(),
3277 }),
3278 path: None,
3279 };
3280 let result = execute(&cli).await.unwrap();
3281 assert!(result.contains("Detached from session test-session"));
3282 }
3283
3284 #[tokio::test]
3285 async fn test_execute_attach_with_backend_session_id() {
3286 use axum::{Router, routing::get};
3287 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3288 let app = Router::new().route(
3289 "/api/v1/sessions/{id}",
3290 get(move || async move { session_json.to_owned() }),
3291 );
3292 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3293 let addr = listener.local_addr().unwrap();
3294 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3295
3296 let cli = Cli {
3297 node: format!("127.0.0.1:{}", addr.port()),
3298 token: None,
3299 command: Some(Commands::Attach {
3300 name: "my-session".into(),
3301 }),
3302 path: None,
3303 };
3304 let result = execute(&cli).await.unwrap();
3305 assert!(result.contains("Detached from session my-session"));
3306 }
3307
3308 #[tokio::test]
3309 async fn test_execute_attach_connection_refused() {
3310 let cli = Cli {
3311 node: "localhost:1".into(),
3312 token: None,
3313 command: Some(Commands::Attach {
3314 name: "test-session".into(),
3315 }),
3316 path: None,
3317 };
3318 let result = execute(&cli).await;
3319 let err = result.unwrap_err().to_string();
3320 assert!(err.contains("Could not connect to pulpod"));
3321 }
3322
3323 #[tokio::test]
3324 async fn test_execute_attach_error_response() {
3325 use axum::{Router, http::StatusCode, routing::get};
3326 let app = Router::new().route(
3327 "/api/v1/sessions/{id}",
3328 get(|| async {
3329 (
3330 StatusCode::NOT_FOUND,
3331 r#"{"error":"session not found"}"#.to_owned(),
3332 )
3333 }),
3334 );
3335 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3336 let addr = listener.local_addr().unwrap();
3337 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3338
3339 let cli = Cli {
3340 node: format!("127.0.0.1:{}", addr.port()),
3341 token: None,
3342 command: Some(Commands::Attach {
3343 name: "nonexistent".into(),
3344 }),
3345 path: None,
3346 };
3347 let result = execute(&cli).await;
3348 let err = result.unwrap_err().to_string();
3349 assert!(err.contains("session not found"));
3350 }
3351
3352 #[tokio::test]
3353 async fn test_execute_attach_stale_session() {
3354 use axum::{Router, routing::get};
3355 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3356 let app = Router::new().route(
3357 "/api/v1/sessions/{id}",
3358 get(move || async move { session_json.to_owned() }),
3359 );
3360 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3361 let addr = listener.local_addr().unwrap();
3362 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3363
3364 let cli = Cli {
3365 node: format!("127.0.0.1:{}", addr.port()),
3366 token: None,
3367 command: Some(Commands::Attach {
3368 name: "stale-sess".into(),
3369 }),
3370 path: None,
3371 };
3372 let result = execute(&cli).await;
3373 let err = result.unwrap_err().to_string();
3374 assert!(err.contains("lost"));
3375 assert!(err.contains("pulpo resume"));
3376 }
3377
3378 #[tokio::test]
3379 async fn test_execute_attach_dead_session() {
3380 use axum::{Router, routing::get};
3381 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3382 let app = Router::new().route(
3383 "/api/v1/sessions/{id}",
3384 get(move || async move { session_json.to_owned() }),
3385 );
3386 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3387 let addr = listener.local_addr().unwrap();
3388 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3389
3390 let cli = Cli {
3391 node: format!("127.0.0.1:{}", addr.port()),
3392 token: None,
3393 command: Some(Commands::Attach {
3394 name: "dead-sess".into(),
3395 }),
3396 path: None,
3397 };
3398 let result = execute(&cli).await;
3399 let err = result.unwrap_err().to_string();
3400 assert!(err.contains("killed"));
3401 assert!(err.contains("cannot attach"));
3402 }
3403
3404 #[test]
3407 fn test_cli_parse_alias_spawn() {
3408 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3409 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3410 }
3411
3412 #[test]
3413 fn test_cli_parse_alias_list() {
3414 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3415 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3416 }
3417
3418 #[test]
3419 fn test_cli_parse_list_all() {
3420 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3421 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3422
3423 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3424 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3425 }
3426
3427 #[test]
3428 fn test_cli_parse_alias_logs() {
3429 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3430 assert!(matches!(
3431 &cli.command,
3432 Some(Commands::Logs { name, .. }) if name == "my-session"
3433 ));
3434 }
3435
3436 #[test]
3437 fn test_cli_parse_alias_kill() {
3438 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3439 assert!(matches!(
3440 &cli.command,
3441 Some(Commands::Kill { names }) if names == &["my-session"]
3442 ));
3443 }
3444
3445 #[test]
3446 fn test_cli_parse_alias_delete() {
3447 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3448 assert!(matches!(
3449 &cli.command,
3450 Some(Commands::Delete { names }) if names == &["my-session"]
3451 ));
3452 }
3453
3454 #[test]
3455 fn test_cli_parse_alias_resume() {
3456 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3457 assert!(matches!(
3458 &cli.command,
3459 Some(Commands::Resume { name }) if name == "my-session"
3460 ));
3461 }
3462
3463 #[test]
3464 fn test_cli_parse_alias_nodes() {
3465 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3466 assert!(matches!(&cli.command, Some(Commands::Nodes)));
3467 }
3468
3469 #[test]
3470 fn test_cli_parse_alias_interventions() {
3471 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3472 assert!(matches!(
3473 &cli.command,
3474 Some(Commands::Interventions { name }) if name == "my-session"
3475 ));
3476 }
3477
3478 #[test]
3479 fn test_api_error_json() {
3480 let err = api_error("{\"error\":\"session not found: foo\"}");
3481 assert_eq!(err.to_string(), "session not found: foo");
3482 }
3483
3484 #[test]
3485 fn test_api_error_plain_text() {
3486 let err = api_error("plain text error");
3487 assert_eq!(err.to_string(), "plain text error");
3488 }
3489
3490 #[test]
3493 fn test_diff_output_empty_prev() {
3494 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3495 }
3496
3497 #[test]
3498 fn test_diff_output_identical() {
3499 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3500 }
3501
3502 #[test]
3503 fn test_diff_output_new_lines_appended() {
3504 let prev = "line1\nline2";
3505 let new = "line1\nline2\nline3\nline4";
3506 assert_eq!(diff_output(prev, new), "line3\nline4");
3507 }
3508
3509 #[test]
3510 fn test_diff_output_scrolled_window() {
3511 let prev = "line1\nline2\nline3";
3513 let new = "line2\nline3\nline4";
3514 assert_eq!(diff_output(prev, new), "line4");
3515 }
3516
3517 #[test]
3518 fn test_diff_output_completely_different() {
3519 let prev = "aaa\nbbb";
3520 let new = "xxx\nyyy";
3521 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3522 }
3523
3524 #[test]
3525 fn test_diff_output_last_line_matches_but_overlap_fails() {
3526 let prev = "aaa\ncommon";
3528 let new = "zzz\ncommon\nnew_line";
3529 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3533 }
3534
3535 #[test]
3536 fn test_diff_output_new_empty() {
3537 assert_eq!(diff_output("line1", ""), "");
3538 }
3539
3540 async fn start_follow_test_server() -> String {
3545 use axum::{Router, extract::Path, extract::Query, routing::get};
3546 use std::sync::Arc;
3547 use std::sync::atomic::{AtomicUsize, Ordering};
3548
3549 let call_count = Arc::new(AtomicUsize::new(0));
3550 let output_count = call_count.clone();
3551
3552 let app = Router::new()
3553 .route(
3554 "/api/v1/sessions/{id}/output",
3555 get(
3556 move |_path: Path<String>,
3557 _query: Query<std::collections::HashMap<String, String>>| {
3558 let count = output_count.clone();
3559 async move {
3560 let n = count.fetch_add(1, Ordering::SeqCst);
3561 let output = match n {
3562 0 => "line1\nline2".to_owned(),
3563 1 => "line1\nline2\nline3".to_owned(),
3564 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3565 };
3566 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3567 }
3568 },
3569 ),
3570 )
3571 .route(
3572 "/api/v1/sessions/{id}",
3573 get(|_path: Path<String>| async {
3574 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3575 }),
3576 );
3577
3578 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3579 let addr = listener.local_addr().unwrap();
3580 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3581 format!("http://127.0.0.1:{}", addr.port())
3582 }
3583
3584 #[tokio::test]
3585 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3586 let base = start_follow_test_server().await;
3587 let client = reqwest::Client::new();
3588 let mut buf = Vec::new();
3589
3590 follow_logs(&client, &base, "test", 100, None, &mut buf)
3591 .await
3592 .unwrap();
3593
3594 let output = String::from_utf8(buf).unwrap();
3595 assert!(output.contains("line1"));
3597 assert!(output.contains("line2"));
3598 assert!(output.contains("line3"));
3599 assert!(output.contains("line4"));
3600 assert!(output.contains("[pulpo] Agent exited"));
3601 }
3602
3603 #[tokio::test]
3604 async fn test_execute_logs_follow_success() {
3605 let base = start_follow_test_server().await;
3606 let node = base.strip_prefix("http://").unwrap().to_owned();
3608
3609 let cli = Cli {
3610 node,
3611 token: None,
3612 command: Some(Commands::Logs {
3613 name: "test".into(),
3614 lines: 100,
3615 follow: true,
3616 }),
3617 path: None,
3618 };
3619 let result = execute(&cli).await.unwrap();
3621 assert_eq!(result, "");
3622 }
3623
3624 #[tokio::test]
3625 async fn test_execute_logs_follow_connection_refused() {
3626 let cli = Cli {
3627 node: "localhost:1".into(),
3628 token: None,
3629 command: Some(Commands::Logs {
3630 name: "test".into(),
3631 lines: 50,
3632 follow: true,
3633 }),
3634 path: None,
3635 };
3636 let result = execute(&cli).await;
3637 let err = result.unwrap_err().to_string();
3638 assert!(
3639 err.contains("Could not connect to pulpod"),
3640 "Expected friendly error, got: {err}"
3641 );
3642 }
3643
3644 #[tokio::test]
3645 async fn test_follow_logs_exits_on_dead() {
3646 use axum::{Router, extract::Path, extract::Query, routing::get};
3647
3648 let app = Router::new()
3649 .route(
3650 "/api/v1/sessions/{id}/output",
3651 get(
3652 |_path: Path<String>,
3653 _query: Query<std::collections::HashMap<String, String>>| async {
3654 r#"{"output":"some output"}"#.to_owned()
3655 },
3656 ),
3657 )
3658 .route(
3659 "/api/v1/sessions/{id}",
3660 get(|_path: Path<String>| async {
3661 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3662 }),
3663 );
3664
3665 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3666 let addr = listener.local_addr().unwrap();
3667 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3668 let base = format!("http://127.0.0.1:{}", addr.port());
3669
3670 let client = reqwest::Client::new();
3671 let mut buf = Vec::new();
3672 follow_logs(&client, &base, "test", 100, None, &mut buf)
3673 .await
3674 .unwrap();
3675
3676 let output = String::from_utf8(buf).unwrap();
3677 assert!(output.contains("some output"));
3678 }
3679
3680 #[tokio::test]
3681 async fn test_follow_logs_exits_on_stale() {
3682 use axum::{Router, extract::Path, extract::Query, routing::get};
3683
3684 let app = Router::new()
3685 .route(
3686 "/api/v1/sessions/{id}/output",
3687 get(
3688 |_path: Path<String>,
3689 _query: Query<std::collections::HashMap<String, String>>| async {
3690 r#"{"output":"stale output"}"#.to_owned()
3691 },
3692 ),
3693 )
3694 .route(
3695 "/api/v1/sessions/{id}",
3696 get(|_path: Path<String>| async {
3697 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3698 }),
3699 );
3700
3701 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3702 let addr = listener.local_addr().unwrap();
3703 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3704 let base = format!("http://127.0.0.1:{}", addr.port());
3705
3706 let client = reqwest::Client::new();
3707 let mut buf = Vec::new();
3708 follow_logs(&client, &base, "test", 100, None, &mut buf)
3709 .await
3710 .unwrap();
3711
3712 let output = String::from_utf8(buf).unwrap();
3713 assert!(output.contains("stale output"));
3714 }
3715
3716 #[tokio::test]
3717 async fn test_execute_logs_follow_non_reqwest_error() {
3718 use axum::{Router, extract::Path, extract::Query, routing::get};
3719
3720 let app = Router::new()
3722 .route(
3723 "/api/v1/sessions/{id}/output",
3724 get(
3725 |_path: Path<String>,
3726 _query: Query<std::collections::HashMap<String, String>>| async {
3727 r#"{"output":"initial"}"#.to_owned()
3728 },
3729 ),
3730 )
3731 .route(
3732 "/api/v1/sessions/{id}",
3733 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3734 );
3735
3736 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3737 let addr = listener.local_addr().unwrap();
3738 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3739 let node = format!("127.0.0.1:{}", addr.port());
3740
3741 let cli = Cli {
3742 node,
3743 token: None,
3744 command: Some(Commands::Logs {
3745 name: "test".into(),
3746 lines: 100,
3747 follow: true,
3748 }),
3749 path: None,
3750 };
3751 let err = execute(&cli).await.unwrap_err();
3752 let msg = err.to_string();
3754 assert!(
3755 msg.contains("expected ident"),
3756 "Expected serde parse error, got: {msg}"
3757 );
3758 }
3759
3760 #[tokio::test]
3761 async fn test_fetch_session_status_connection_error() {
3762 let client = reqwest::Client::new();
3763 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3764 assert!(result.is_err());
3765 }
3766
3767 #[test]
3770 fn test_format_schedules_empty() {
3771 assert_eq!(format_schedules(&[]), "No schedules.");
3772 }
3773
3774 #[test]
3775 fn test_format_schedules_with_entries() {
3776 let schedules = vec![serde_json::json!({
3777 "name": "nightly",
3778 "cron": "0 3 * * *",
3779 "enabled": true,
3780 "last_run_at": null,
3781 "target_node": null
3782 })];
3783 let output = format_schedules(&schedules);
3784 assert!(output.contains("nightly"));
3785 assert!(output.contains("0 3 * * *"));
3786 assert!(output.contains("local"));
3787 assert!(output.contains("yes"));
3788 assert!(output.contains('-'));
3789 }
3790
3791 #[test]
3792 fn test_format_schedules_disabled_entry() {
3793 let schedules = vec![serde_json::json!({
3794 "name": "weekly",
3795 "cron": "0 0 * * 0",
3796 "enabled": false,
3797 "last_run_at": "2026-03-18T03:00:00Z",
3798 "target_node": "gpu-box"
3799 })];
3800 let output = format_schedules(&schedules);
3801 assert!(output.contains("weekly"));
3802 assert!(output.contains("no"));
3803 assert!(output.contains("gpu-box"));
3804 assert!(output.contains("2026-03-18T03:00"));
3805 }
3806
3807 #[test]
3808 fn test_format_schedules_header() {
3809 let schedules = vec![serde_json::json!({
3810 "name": "test",
3811 "cron": "* * * * *",
3812 "enabled": true,
3813 "last_run_at": null,
3814 "target_node": null
3815 })];
3816 let output = format_schedules(&schedules);
3817 assert!(output.contains("NAME"));
3818 assert!(output.contains("CRON"));
3819 assert!(output.contains("ENABLED"));
3820 assert!(output.contains("LAST RUN"));
3821 assert!(output.contains("NODE"));
3822 }
3823
3824 #[test]
3827 fn test_cli_parse_schedule_add() {
3828 let cli = Cli::try_parse_from([
3829 "pulpo",
3830 "schedule",
3831 "add",
3832 "nightly",
3833 "0 3 * * *",
3834 "--workdir",
3835 "/repo",
3836 "--",
3837 "claude",
3838 "-p",
3839 "review",
3840 ])
3841 .unwrap();
3842 assert!(matches!(
3843 &cli.command,
3844 Some(Commands::Schedule {
3845 action: ScheduleAction::Add { name, cron, .. }
3846 }) if name == "nightly" && cron == "0 3 * * *"
3847 ));
3848 }
3849
3850 #[test]
3851 fn test_cli_parse_schedule_add_with_node() {
3852 let cli = Cli::try_parse_from([
3853 "pulpo",
3854 "schedule",
3855 "add",
3856 "nightly",
3857 "0 3 * * *",
3858 "--workdir",
3859 "/repo",
3860 "--node",
3861 "gpu-box",
3862 "--",
3863 "claude",
3864 ])
3865 .unwrap();
3866 assert!(matches!(
3867 &cli.command,
3868 Some(Commands::Schedule {
3869 action: ScheduleAction::Add { node, .. }
3870 }) if node.as_deref() == Some("gpu-box")
3871 ));
3872 }
3873
3874 #[test]
3875 fn test_cli_parse_schedule_add_install_alias() {
3876 let cli =
3877 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3878 assert!(matches!(
3879 &cli.command,
3880 Some(Commands::Schedule {
3881 action: ScheduleAction::Add { name, .. }
3882 }) if name == "nightly"
3883 ));
3884 }
3885
3886 #[test]
3887 fn test_cli_parse_schedule_list() {
3888 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3889 assert!(matches!(
3890 &cli.command,
3891 Some(Commands::Schedule {
3892 action: ScheduleAction::List
3893 })
3894 ));
3895 }
3896
3897 #[test]
3898 fn test_cli_parse_schedule_remove() {
3899 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3900 assert!(matches!(
3901 &cli.command,
3902 Some(Commands::Schedule {
3903 action: ScheduleAction::Remove { name }
3904 }) if name == "nightly"
3905 ));
3906 }
3907
3908 #[test]
3909 fn test_cli_parse_schedule_pause() {
3910 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3911 assert!(matches!(
3912 &cli.command,
3913 Some(Commands::Schedule {
3914 action: ScheduleAction::Pause { name }
3915 }) if name == "nightly"
3916 ));
3917 }
3918
3919 #[test]
3920 fn test_cli_parse_schedule_resume() {
3921 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3922 assert!(matches!(
3923 &cli.command,
3924 Some(Commands::Schedule {
3925 action: ScheduleAction::Resume { name }
3926 }) if name == "nightly"
3927 ));
3928 }
3929
3930 #[test]
3931 fn test_cli_parse_schedule_alias() {
3932 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3933 assert!(matches!(
3934 &cli.command,
3935 Some(Commands::Schedule {
3936 action: ScheduleAction::List
3937 })
3938 ));
3939 }
3940
3941 #[test]
3942 fn test_cli_parse_schedule_list_alias() {
3943 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3944 assert!(matches!(
3945 &cli.command,
3946 Some(Commands::Schedule {
3947 action: ScheduleAction::List
3948 })
3949 ));
3950 }
3951
3952 #[test]
3953 fn test_cli_parse_schedule_remove_alias() {
3954 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3955 assert!(matches!(
3956 &cli.command,
3957 Some(Commands::Schedule {
3958 action: ScheduleAction::Remove { name }
3959 }) if name == "nightly"
3960 ));
3961 }
3962
3963 #[tokio::test]
3964 async fn test_execute_schedule_list_via_execute() {
3965 let node = start_test_server().await;
3966 let cli = Cli {
3967 node,
3968 token: None,
3969 command: Some(Commands::Schedule {
3970 action: ScheduleAction::List,
3971 }),
3972 path: None,
3973 };
3974 let result = execute(&cli).await.unwrap();
3975 #[cfg(coverage)]
3977 assert!(result.is_empty());
3978 #[cfg(not(coverage))]
3979 assert_eq!(result, "No schedules.");
3980 }
3981
3982 #[test]
3983 fn test_schedule_action_debug() {
3984 let action = ScheduleAction::List;
3985 assert_eq!(format!("{action:?}"), "List");
3986 }
3987
3988 #[test]
3989 fn test_cli_parse_send_alias() {
3990 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3991 assert!(matches!(
3992 &cli.command,
3993 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3994 ));
3995 }
3996
3997 #[test]
3998 fn test_cli_parse_spawn_no_name() {
3999 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
4000 assert!(matches!(
4001 &cli.command,
4002 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
4003 ));
4004 }
4005
4006 #[test]
4007 fn test_cli_parse_spawn_optional_name_with_command() {
4008 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
4009 assert!(matches!(
4010 &cli.command,
4011 Some(Commands::Spawn { name, command, .. })
4012 if name.is_none() && command == &["echo", "hello"]
4013 ));
4014 }
4015
4016 #[test]
4017 fn test_cli_parse_path_shortcut() {
4018 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
4019 assert!(cli.command.is_none());
4020 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
4021 }
4022
4023 #[test]
4024 fn test_cli_parse_no_args() {
4025 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
4026 assert!(cli.command.is_none());
4027 assert!(cli.path.is_none());
4028 }
4029
4030 #[test]
4031 fn test_derive_session_name_simple() {
4032 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
4033 }
4034
4035 #[test]
4036 fn test_derive_session_name_with_special_chars() {
4037 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4038 }
4039
4040 #[test]
4041 fn test_derive_session_name_root() {
4042 assert_eq!(derive_session_name("/"), "session");
4043 }
4044
4045 #[test]
4046 fn test_derive_session_name_dots() {
4047 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4048 }
4049
4050 #[test]
4051 fn test_resolve_path_absolute() {
4052 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4053 }
4054
4055 #[test]
4056 fn test_resolve_path_relative() {
4057 let resolved = resolve_path("my-repo");
4058 assert!(resolved.ends_with("my-repo"));
4059 assert!(resolved.starts_with('/'));
4060 }
4061
4062 #[tokio::test]
4063 async fn test_execute_no_args_shows_help() {
4064 let node = start_test_server().await;
4065 let cli = Cli {
4066 node,
4067 token: None,
4068 path: None,
4069 command: None,
4070 };
4071 let result = execute(&cli).await.unwrap();
4072 assert!(
4073 result.is_empty(),
4074 "no-args should return empty string after printing help"
4075 );
4076 }
4077
4078 #[tokio::test]
4079 async fn test_execute_path_shortcut() {
4080 let node = start_test_server().await;
4081 let cli = Cli {
4082 node,
4083 token: None,
4084 path: Some("/tmp".into()),
4085 command: None,
4086 };
4087 let result = execute(&cli).await.unwrap();
4088 assert!(result.contains("Detached from session"));
4089 }
4090
4091 #[tokio::test]
4092 async fn test_deduplicate_session_name_no_conflict() {
4093 let base = "http://127.0.0.1:1";
4095 let client = reqwest::Client::new();
4096 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4097 assert_eq!(name, "fresh");
4098 }
4099
4100 #[tokio::test]
4101 async fn test_deduplicate_session_name_with_conflict() {
4102 use axum::{Router, routing::get};
4103 use std::sync::atomic::{AtomicU32, Ordering};
4104
4105 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4106 let counter = call_count.clone();
4107 let app = Router::new()
4108 .route(
4109 "/api/v1/sessions/{id}",
4110 get(move || {
4111 let c = counter.clone();
4112 async move {
4113 let n = c.fetch_add(1, Ordering::SeqCst);
4114 if n == 0 {
4115 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4117 } else {
4118 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4120 }
4121 }
4122 }),
4123 )
4124 .route(
4125 "/api/v1/peers",
4126 get(|| async {
4127 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4128 }),
4129 );
4130 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4131 let addr = listener.local_addr().unwrap();
4132 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4133 let base = format!("http://127.0.0.1:{}", addr.port());
4134 let client = reqwest::Client::new();
4135 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4136 assert_eq!(name, "repo-2");
4137 }
4138
4139 #[test]
4142 fn test_node_needs_resolution() {
4143 assert!(!node_needs_resolution("localhost:7433"));
4144 assert!(!node_needs_resolution("mac-mini:7433"));
4145 assert!(!node_needs_resolution("10.0.0.1:7433"));
4146 assert!(!node_needs_resolution("[::1]:7433"));
4147 assert!(node_needs_resolution("mac-mini"));
4148 assert!(node_needs_resolution("linux-server"));
4149 assert!(node_needs_resolution("localhost"));
4150 }
4151
4152 #[tokio::test]
4153 async fn test_resolve_node_with_port() {
4154 let client = reqwest::Client::new();
4155 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4156 assert_eq!(addr, "mac-mini:7433");
4157 assert!(token.is_none());
4158 }
4159
4160 #[tokio::test]
4161 async fn test_resolve_node_fallback_appends_port() {
4162 let client = reqwest::Client::new();
4165 let (addr, token) = resolve_node(&client, "unknown-host").await;
4166 assert_eq!(addr, "unknown-host:7433");
4167 assert!(token.is_none());
4168 }
4169
4170 #[cfg(not(coverage))]
4171 #[tokio::test]
4172 async fn test_resolve_node_finds_peer() {
4173 use axum::{Router, routing::get};
4174
4175 let app = Router::new()
4176 .route(
4177 "/api/v1/peers",
4178 get(|| async {
4179 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4180 }),
4181 )
4182 .route(
4183 "/api/v1/config",
4184 get(|| async {
4185 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4186 }),
4187 );
4188
4189 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4191 return;
4192 };
4193 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4194
4195 let client = reqwest::Client::new();
4196 let (addr, token) = resolve_node(&client, "mac-mini").await;
4197 assert_eq!(addr, "10.0.0.5:7433");
4198 assert_eq!(token, Some("peer-secret".into()));
4199 }
4200
4201 #[cfg(not(coverage))]
4202 #[tokio::test]
4203 async fn test_resolve_node_peer_no_token() {
4204 use axum::{Router, routing::get};
4205
4206 let app = Router::new()
4207 .route(
4208 "/api/v1/peers",
4209 get(|| async {
4210 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4211 }),
4212 )
4213 .route(
4214 "/api/v1/config",
4215 get(|| async {
4216 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4217 }),
4218 );
4219
4220 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4221 return; };
4223 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4224
4225 let client = reqwest::Client::new();
4226 let (addr, token) = resolve_node(&client, "test-peer").await;
4227 assert_eq!(addr, "10.0.0.9:7433");
4228 assert!(token.is_none()); }
4230
4231 #[tokio::test]
4232 async fn test_execute_with_peer_name_resolution() {
4233 let cli = Cli {
4237 node: "nonexistent-peer".into(),
4238 token: None,
4239 command: Some(Commands::List { all: false }),
4240 path: None,
4241 };
4242 let result = execute(&cli).await;
4243 assert!(result.is_err());
4245 }
4246
4247 #[test]
4250 fn test_cli_parse_spawn_auto() {
4251 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4252 assert!(matches!(
4253 &cli.command,
4254 Some(Commands::Spawn { auto, .. }) if *auto
4255 ));
4256 }
4257
4258 #[test]
4259 fn test_cli_parse_spawn_auto_default() {
4260 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4261 assert!(matches!(
4262 &cli.command,
4263 Some(Commands::Spawn { auto, .. }) if !auto
4264 ));
4265 }
4266
4267 #[tokio::test]
4268 async fn test_select_best_node_coverage_stub() {
4269 let client = reqwest::Client::new();
4271 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4274 }
4275
4276 #[cfg(not(coverage))]
4277 #[tokio::test]
4278 async fn test_select_best_node_picks_least_loaded() {
4279 use axum::{Router, routing::get};
4280
4281 let app = Router::new().route(
4282 "/api/v1/peers",
4283 get(|| async {
4284 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4285 }),
4286 );
4287 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4288 let addr = listener.local_addr().unwrap();
4289 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4290 let base = format!("http://127.0.0.1:{}", addr.port());
4291
4292 let client = reqwest::Client::new();
4293 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4294 assert_eq!(name, "idle");
4298 assert_eq!(addr, "idle:7433");
4299 }
4300
4301 #[cfg(not(coverage))]
4302 #[tokio::test]
4303 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4304 use axum::{Router, routing::get};
4305
4306 let app = Router::new().route(
4307 "/api/v1/peers",
4308 get(|| async {
4309 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4310 }),
4311 );
4312 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4313 let addr = listener.local_addr().unwrap();
4314 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4315 let base = format!("http://127.0.0.1:{}", addr.port());
4316
4317 let client = reqwest::Client::new();
4318 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4319 assert_eq!(name, "my-mac");
4320 assert_eq!(addr, "localhost:7433");
4321 }
4322
4323 #[cfg(not(coverage))]
4324 #[tokio::test]
4325 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4326 use axum::{Router, routing::get};
4327
4328 let app = Router::new().route(
4329 "/api/v1/peers",
4330 get(|| async {
4331 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4332 }),
4333 );
4334 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4335 let addr = listener.local_addr().unwrap();
4336 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4337 let base = format!("http://127.0.0.1:{}", addr.port());
4338
4339 let client = reqwest::Client::new();
4340 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4341 assert_eq!(name, "solo");
4342 assert_eq!(addr, "localhost:7433");
4343 }
4344
4345 #[cfg(not(coverage))]
4346 #[tokio::test]
4347 async fn test_execute_spawn_auto_selects_node() {
4348 use axum::{
4349 Router,
4350 http::StatusCode,
4351 routing::{get, post},
4352 };
4353
4354 let create_json = test_create_response_json();
4355
4356 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4358 let addr = listener.local_addr().unwrap();
4359 let node = format!("127.0.0.1:{}", addr.port());
4360 let peer_addr = node.clone();
4361
4362 let app = Router::new()
4363 .route(
4364 "/api/v1/peers",
4365 get(move || {
4366 let peer_addr = peer_addr.clone();
4367 async move {
4368 format!(
4369 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4370 )
4371 }
4372 }),
4373 )
4374 .route(
4375 "/api/v1/sessions",
4376 post(move || async move {
4377 (StatusCode::CREATED, create_json.clone())
4378 }),
4379 );
4380
4381 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4382
4383 let cli = Cli {
4384 node,
4385 token: None,
4386 command: Some(Commands::Spawn {
4387 name: Some("test".into()),
4388 workdir: Some("/tmp/repo".into()),
4389 ink: None,
4390 description: None,
4391 detach: true,
4392 idle_threshold: None,
4393 auto: true,
4394 worktree: false,
4395 runtime: None,
4396 secret: vec![],
4397 command: vec!["echo".into(), "hello".into()],
4398 }),
4399 path: None,
4400 };
4401 let result = execute(&cli).await.unwrap();
4402 assert!(result.contains("Created session"));
4403 }
4404
4405 #[cfg(not(coverage))]
4406 #[tokio::test]
4407 async fn test_select_best_node_peer_no_session_count() {
4408 use axum::{Router, routing::get};
4409
4410 let app = Router::new().route(
4411 "/api/v1/peers",
4412 get(|| async {
4413 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4414 }),
4415 );
4416 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4417 let addr = listener.local_addr().unwrap();
4418 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4419 let base = format!("http://127.0.0.1:{}", addr.port());
4420
4421 let client = reqwest::Client::new();
4422 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4423 assert_eq!(name, "fresh");
4425 assert_eq!(addr, "fresh:7433");
4426 }
4427
4428 #[test]
4431 fn test_cli_parse_secret_set() {
4432 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4433 assert!(matches!(
4434 &cli.command,
4435 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4436 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4437 ));
4438 }
4439
4440 #[test]
4441 fn test_cli_parse_secret_list() {
4442 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4443 assert!(matches!(
4444 &cli.command,
4445 Some(Commands::Secret {
4446 action: SecretAction::List
4447 })
4448 ));
4449 }
4450
4451 #[test]
4452 fn test_cli_parse_secret_list_alias() {
4453 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4454 assert!(matches!(
4455 &cli.command,
4456 Some(Commands::Secret {
4457 action: SecretAction::List
4458 })
4459 ));
4460 }
4461
4462 #[test]
4463 fn test_cli_parse_secret_delete() {
4464 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4465 assert!(matches!(
4466 &cli.command,
4467 Some(Commands::Secret { action: SecretAction::Delete { name } })
4468 if name == "MY_TOKEN"
4469 ));
4470 }
4471
4472 #[test]
4473 fn test_cli_parse_secret_delete_alias() {
4474 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4475 assert!(matches!(
4476 &cli.command,
4477 Some(Commands::Secret { action: SecretAction::Delete { name } })
4478 if name == "MY_TOKEN"
4479 ));
4480 }
4481
4482 #[test]
4483 fn test_cli_parse_secret_alias() {
4484 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4485 assert!(matches!(
4486 &cli.command,
4487 Some(Commands::Secret {
4488 action: SecretAction::List
4489 })
4490 ));
4491 }
4492
4493 #[test]
4494 fn test_format_secrets_empty() {
4495 let secrets: Vec<serde_json::Value> = vec![];
4496 assert_eq!(format_secrets(&secrets), "No secrets configured.");
4497 }
4498
4499 #[test]
4500 fn test_format_secrets_with_entries() {
4501 let secrets = vec![
4502 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4503 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4504 ];
4505 let output = format_secrets(&secrets);
4506 assert!(output.contains("GITHUB_TOKEN"));
4507 assert!(output.contains("NPM_TOKEN"));
4508 assert!(output.contains("NAME"));
4509 assert!(output.contains("ENV"));
4510 assert!(output.contains("CREATED"));
4511 }
4512
4513 #[test]
4514 fn test_format_secrets_with_env() {
4515 let secrets = vec![
4516 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4517 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4518 ];
4519 let output = format_secrets(&secrets);
4520 assert!(output.contains("GH_WORK"));
4521 assert!(output.contains("GITHUB_TOKEN"));
4522 assert!(output.contains("NPM_TOKEN"));
4523 }
4524
4525 #[test]
4526 fn test_format_secrets_short_timestamp() {
4527 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4528 let output = format_secrets(&secrets);
4529 assert!(output.contains("now"));
4530 }
4531
4532 #[test]
4533 fn test_format_schedules_short_last_run_at() {
4534 let schedules = vec![serde_json::json!({
4536 "name": "test",
4537 "cron": "* * * * *",
4538 "enabled": true,
4539 "last_run_at": "short",
4540 "target_node": null
4541 })];
4542 let output = format_schedules(&schedules);
4543 assert!(output.contains("short"));
4544 }
4545
4546 #[test]
4547 fn test_format_sessions_multibyte_command_truncation() {
4548 use chrono::Utc;
4549 use pulpo_common::session::SessionStatus;
4550 use uuid::Uuid;
4551
4552 let sessions = vec![Session {
4554 id: Uuid::nil(),
4555 name: "test".into(),
4556 workdir: "/tmp".into(),
4557 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4558 description: None,
4559 status: SessionStatus::Active,
4560 exit_code: None,
4561 backend_session_id: None,
4562 output_snapshot: None,
4563 metadata: None,
4564 ink: None,
4565 intervention_code: None,
4566 intervention_reason: None,
4567 intervention_at: None,
4568 last_output_at: None,
4569 idle_since: None,
4570 idle_threshold_secs: None,
4571 worktree_path: None,
4572 runtime: Runtime::Tmux,
4573 created_at: Utc::now(),
4574 updated_at: Utc::now(),
4575 }];
4576 let output = format_sessions(&sessions);
4577 assert!(output.contains("..."));
4578 }
4579}