1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version = env!("PULPO_VERSION"),
15 args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18 #[arg(long, default_value = "localhost:7433")]
20 pub node: String,
21
22 #[arg(long)]
24 pub token: Option<String>,
25
26 #[command(subcommand)]
27 pub command: Option<Commands>,
28
29 #[arg(value_name = "PATH")]
31 pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37 #[command(visible_alias = "a")]
39 Attach {
40 name: String,
42 },
43
44 #[command(visible_alias = "i", visible_alias = "send")]
46 Input {
47 name: String,
49 text: Option<String>,
51 },
52
53 #[command(visible_alias = "s")]
55 Spawn {
56 name: Option<String>,
58
59 #[arg(long)]
61 workdir: Option<String>,
62
63 #[arg(long)]
65 ink: Option<String>,
66
67 #[arg(long)]
69 description: Option<String>,
70
71 #[arg(short, long)]
73 detach: bool,
74
75 #[arg(long)]
77 idle_threshold: Option<u32>,
78
79 #[arg(long)]
81 auto: bool,
82
83 #[arg(long)]
85 worktree: bool,
86
87 #[arg(long)]
89 runtime: Option<String>,
90
91 #[arg(long)]
93 secret: Vec<String>,
94
95 #[arg(last = true)]
97 command: Vec<String>,
98 },
99
100 #[command(visible_alias = "ls")]
102 List {
103 #[arg(short, long)]
105 all: bool,
106 },
107
108 #[command(visible_alias = "l")]
110 Logs {
111 name: String,
113
114 #[arg(long, default_value = "100")]
116 lines: usize,
117
118 #[arg(short, long)]
120 follow: bool,
121 },
122
123 #[command(visible_alias = "k")]
125 Kill {
126 name: String,
128 },
129
130 #[command(visible_alias = "rm")]
132 Delete {
133 name: String,
135 },
136
137 #[command(visible_alias = "r")]
139 Resume {
140 name: String,
142 },
143
144 #[command(visible_alias = "n")]
146 Nodes,
147
148 #[command(visible_alias = "iv")]
150 Interventions {
151 name: String,
153 },
154
155 Ui,
157
158 #[command(visible_alias = "sched")]
160 Schedule {
161 #[command(subcommand)]
162 action: ScheduleAction,
163 },
164
165 #[command(visible_alias = "sec")]
167 Secret {
168 #[command(subcommand)]
169 action: SecretAction,
170 },
171}
172
173#[derive(Subcommand, Debug)]
174pub enum SecretAction {
175 Set {
177 name: String,
179 value: String,
181 #[arg(long)]
183 env: Option<String>,
184 },
185 #[command(visible_alias = "ls")]
187 List,
188 #[command(visible_alias = "rm")]
190 Delete {
191 name: String,
193 },
194}
195
196#[derive(Subcommand, Debug)]
197pub enum ScheduleAction {
198 #[command(alias = "install")]
200 Add {
201 name: String,
203 cron: String,
205 #[arg(long)]
207 workdir: Option<String>,
208 #[arg(long)]
210 node: Option<String>,
211 #[arg(long)]
213 ink: Option<String>,
214 #[arg(long)]
216 description: Option<String>,
217 #[arg(last = true)]
219 command: Vec<String>,
220 },
221 #[command(alias = "ls")]
223 List,
224 #[command(alias = "rm")]
226 Remove {
227 name: String,
229 },
230 Pause {
232 name: String,
234 },
235 Resume {
237 name: String,
239 },
240}
241
242const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
244
245fn resolve_path(path: &str) -> String {
247 let p = std::path::Path::new(path);
248 if p.is_absolute() {
249 path.to_owned()
250 } else {
251 std::env::current_dir().map_or_else(
252 |_| path.to_owned(),
253 |cwd| cwd.join(p).to_string_lossy().into_owned(),
254 )
255 }
256}
257
258fn derive_session_name(path: &str) -> String {
260 let basename = std::path::Path::new(path)
261 .file_name()
262 .and_then(|n| n.to_str())
263 .unwrap_or("session");
264 let kebab: String = basename
266 .chars()
267 .map(|c| {
268 if c.is_ascii_alphanumeric() {
269 c.to_ascii_lowercase()
270 } else {
271 '-'
272 }
273 })
274 .collect();
275 let mut result = String::new();
277 for c in kebab.chars() {
278 if c == '-' && result.ends_with('-') {
279 continue;
280 }
281 result.push(c);
282 }
283 let result = result.trim_matches('-').to_owned();
284 if result.is_empty() {
285 "session".to_owned()
286 } else {
287 result
288 }
289}
290
291async fn deduplicate_session_name(
293 client: &reqwest::Client,
294 base: &str,
295 name: &str,
296 token: Option<&str>,
297) -> String {
298 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
300 .send()
301 .await;
302 match resp {
303 Ok(r) if r.status().is_success() => {
304 for i in 2..=99 {
306 let candidate = format!("{name}-{i}");
307 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
308 .send()
309 .await;
310 match resp {
311 Ok(r) if r.status().is_success() => {}
312 _ => return candidate,
313 }
314 }
315 format!("{name}-100")
316 }
317 _ => name.to_owned(),
318 }
319}
320
321pub fn base_url(node: &str) -> String {
323 format!("http://{node}")
324}
325
326#[derive(serde::Deserialize)]
328struct OutputResponse {
329 output: String,
330}
331
332const fn session_runtime(session: &Session) -> &'static str {
334 match session.runtime {
335 Runtime::Tmux => "tmux",
336 Runtime::Docker => "docker",
337 }
338}
339
340fn format_sessions(sessions: &[Session]) -> String {
341 if sessions.is_empty() {
342 return "No sessions.".into();
343 }
344 let mut lines = vec![format!(
345 "{:<10} {:<24} {:<12} {:<8} {}",
346 "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
347 )];
348 for s in sessions {
349 let cmd_display = if s.command.len() > 50 {
350 let truncated: String = s.command.chars().take(47).collect();
351 format!("{truncated}...")
352 } else {
353 s.command.clone()
354 };
355 let short_id = &s.id.to_string()[..8];
356 let has_pr = s
357 .metadata
358 .as_ref()
359 .is_some_and(|m| m.contains_key("pr_url"));
360 let name_display = match (s.worktree_path.is_some(), has_pr) {
361 (true, true) => format!("{} [wt] [PR]", s.name),
362 (true, false) => format!("{} [wt]", s.name),
363 (false, true) => format!("{} [PR]", s.name),
364 (false, false) => s.name.clone(),
365 };
366 lines.push(format!(
367 "{:<10} {:<24} {:<12} {:<8} {}",
368 short_id,
369 name_display,
370 s.status,
371 session_runtime(s),
372 cmd_display
373 ));
374 }
375 lines.join("\n")
376}
377
378fn format_nodes(resp: &PeersResponse) -> String {
380 let mut lines = vec![format!(
381 "{:<20} {:<25} {:<10} {}",
382 "NAME", "ADDRESS", "STATUS", "SESSIONS"
383 )];
384 lines.push(format!(
385 "{:<20} {:<25} {:<10} {}",
386 resp.local.name, "(local)", "online", "-"
387 ));
388 for p in &resp.peers {
389 let sessions = p
390 .session_count
391 .map_or_else(|| "-".into(), |c| c.to_string());
392 lines.push(format!(
393 "{:<20} {:<25} {:<10} {}",
394 p.name, p.address, p.status, sessions
395 ));
396 }
397 lines.join("\n")
398}
399
400fn format_interventions(events: &[InterventionEventResponse]) -> String {
402 if events.is_empty() {
403 return "No intervention events.".into();
404 }
405 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
406 for e in events {
407 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
408 }
409 lines.join("\n")
410}
411
412#[cfg_attr(coverage, allow(dead_code))]
414fn build_open_command(url: &str) -> std::process::Command {
415 #[cfg(target_os = "macos")]
416 {
417 let mut cmd = std::process::Command::new("open");
418 cmd.arg(url);
419 cmd
420 }
421 #[cfg(target_os = "linux")]
422 {
423 let mut cmd = std::process::Command::new("xdg-open");
424 cmd.arg(url);
425 cmd
426 }
427 #[cfg(target_os = "windows")]
428 {
429 let mut cmd = std::process::Command::new("cmd");
430 cmd.args(["/C", "start", url]);
431 cmd
432 }
433 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
434 {
435 let mut cmd = std::process::Command::new("xdg-open");
437 cmd.arg(url);
438 cmd
439 }
440}
441
442#[cfg(not(coverage))]
444fn open_browser(url: &str) -> Result<()> {
445 build_open_command(url).status()?;
446 Ok(())
447}
448
449#[cfg(coverage)]
451fn open_browser(_url: &str) -> Result<()> {
452 Ok(())
453}
454
455#[cfg_attr(coverage, allow(dead_code))]
458fn build_attach_command(backend_session_id: &str) -> std::process::Command {
459 if let Some(container) = backend_session_id.strip_prefix("docker:") {
461 let mut cmd = std::process::Command::new("docker");
462 cmd.args(["exec", "-it", container, "/bin/sh"]);
463 return cmd;
464 }
465 #[cfg(not(target_os = "windows"))]
467 {
468 let mut cmd = std::process::Command::new("tmux");
469 cmd.args(["attach-session", "-t", backend_session_id]);
470 cmd
471 }
472 #[cfg(target_os = "windows")]
473 {
474 let mut cmd = std::process::Command::new("cmd");
476 cmd.args([
477 "/C",
478 "echo",
479 "Attach not available on Windows. Use the web UI or --runtime docker.",
480 ]);
481 cmd
482 }
483}
484
485#[cfg(not(any(test, coverage, target_os = "windows")))]
487fn attach_session(backend_session_id: &str) -> Result<()> {
488 let status = build_attach_command(backend_session_id).status()?;
489 if !status.success() {
490 anyhow::bail!("attach failed with {status}");
491 }
492 Ok(())
493}
494
495#[cfg(all(target_os = "windows", not(test), not(coverage)))]
497fn attach_session(_backend_session_id: &str) -> Result<()> {
498 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
499 Ok(())
500}
501
502#[cfg(any(test, coverage))]
504#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
505fn attach_session(_backend_session_id: &str) -> Result<()> {
506 Ok(())
507}
508
509fn api_error(text: &str) -> anyhow::Error {
511 serde_json::from_str::<serde_json::Value>(text)
512 .ok()
513 .and_then(|v| v["error"].as_str().map(String::from))
514 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
515}
516
517async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
519 if resp.status().is_success() {
520 Ok(resp.text().await?)
521 } else {
522 let text = resp.text().await?;
523 Err(api_error(&text))
524 }
525}
526
527fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
529 if err.is_connect() {
530 anyhow::anyhow!(
531 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
532 )
533 } else {
534 anyhow::anyhow!("Network error connecting to {node}: {err}")
535 }
536}
537
538fn is_localhost(node: &str) -> bool {
540 let host = node.split(':').next().unwrap_or(node);
541 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
542}
543
544async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
546 let resp = client
547 .get(format!("{base}/api/v1/auth/token"))
548 .send()
549 .await
550 .ok()?;
551 let body: AuthTokenResponse = resp.json().await.ok()?;
552 if body.token.is_empty() {
553 None
554 } else {
555 Some(body.token)
556 }
557}
558
559async fn resolve_token(
561 client: &reqwest::Client,
562 base: &str,
563 node: &str,
564 explicit: Option<&str>,
565) -> Option<String> {
566 if let Some(t) = explicit {
567 return Some(t.to_owned());
568 }
569 if is_localhost(node) {
570 return discover_token(client, base).await;
571 }
572 None
573}
574
575fn node_needs_resolution(node: &str) -> bool {
577 !node.contains(':')
578}
579
580#[cfg(not(coverage))]
587async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
588 if !node_needs_resolution(node) {
590 return (node.to_owned(), None);
591 }
592
593 let local_base = "http://localhost:7433";
595 let mut resolved_address: Option<String> = None;
596
597 if let Ok(resp) = client
598 .get(format!("{local_base}/api/v1/peers"))
599 .send()
600 .await
601 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
602 {
603 for peer in &peers_resp.peers {
604 if peer.name == node {
605 resolved_address = Some(peer.address.clone());
606 break;
607 }
608 }
609 }
610
611 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
612
613 let peer_token = if let Ok(resp) = client
615 .get(format!("{local_base}/api/v1/config"))
616 .send()
617 .await
618 && let Ok(config) = resp.json::<ConfigResponse>().await
619 && let Some(entry) = config.peers.get(node)
620 {
621 entry.token().map(String::from)
622 } else {
623 None
624 };
625
626 (address, peer_token)
627}
628
629#[cfg(coverage)]
631async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
632 if node_needs_resolution(node) {
633 (format!("{node}:7433"), None)
634 } else {
635 (node.to_owned(), None)
636 }
637}
638
639fn authed_get(
641 client: &reqwest::Client,
642 url: String,
643 token: Option<&str>,
644) -> reqwest::RequestBuilder {
645 let req = client.get(url);
646 if let Some(t) = token {
647 req.bearer_auth(t)
648 } else {
649 req
650 }
651}
652
653fn authed_post(
655 client: &reqwest::Client,
656 url: String,
657 token: Option<&str>,
658) -> reqwest::RequestBuilder {
659 let req = client.post(url);
660 if let Some(t) = token {
661 req.bearer_auth(t)
662 } else {
663 req
664 }
665}
666
667fn authed_delete(
669 client: &reqwest::Client,
670 url: String,
671 token: Option<&str>,
672) -> reqwest::RequestBuilder {
673 let req = client.delete(url);
674 if let Some(t) = token {
675 req.bearer_auth(t)
676 } else {
677 req
678 }
679}
680
681#[cfg(not(coverage))]
683fn authed_put(
684 client: &reqwest::Client,
685 url: String,
686 token: Option<&str>,
687) -> reqwest::RequestBuilder {
688 let req = client.put(url);
689 if let Some(t) = token {
690 req.bearer_auth(t)
691 } else {
692 req
693 }
694}
695
696async fn fetch_output(
698 client: &reqwest::Client,
699 base: &str,
700 name: &str,
701 lines: usize,
702 token: Option<&str>,
703) -> Result<String> {
704 let resp = authed_get(
705 client,
706 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
707 token,
708 )
709 .send()
710 .await?;
711 let text = ok_or_api_error(resp).await?;
712 let output: OutputResponse = serde_json::from_str(&text)?;
713 Ok(output.output)
714}
715
716async fn fetch_session_status(
718 client: &reqwest::Client,
719 base: &str,
720 name: &str,
721 token: Option<&str>,
722) -> Result<String> {
723 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
724 .send()
725 .await?;
726 let text = ok_or_api_error(resp).await?;
727 let session: Session = serde_json::from_str(&text)?;
728 Ok(session.status.to_string())
729}
730
731async fn check_session_alive(
735 client: &reqwest::Client,
736 base: &str,
737 session_id: &str,
738 token: Option<&str>,
739) -> Result<()> {
740 for _ in 0..3 {
742 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
743 let resp = authed_get(
745 client,
746 format!("{base}/api/v1/sessions/{session_id}"),
747 token,
748 )
749 .send()
750 .await;
751 if let Ok(resp) = resp
752 && let Ok(text) = ok_or_api_error(resp).await
753 && let Ok(session) = serde_json::from_str::<Session>(&text)
754 {
755 match session.status.to_string().as_str() {
756 "creating" => continue,
757 "lost" | "killed" => {
758 anyhow::bail!(
759 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
760 session.name,
761 session.name
762 );
763 }
764 _ => return Ok(()),
765 }
766 }
767 break;
769 }
770 Ok(())
771}
772
773fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
780 if prev.is_empty() {
781 return new;
782 }
783
784 let prev_lines: Vec<&str> = prev.lines().collect();
785 let new_lines: Vec<&str> = new.lines().collect();
786
787 if new_lines.is_empty() {
788 return "";
789 }
790
791 let last_prev = prev_lines[prev_lines.len() - 1];
793
794 for i in (0..new_lines.len()).rev() {
796 if new_lines[i] == last_prev {
797 let overlap_len = prev_lines.len().min(i + 1);
799 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
800 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
801 if prev_tail == new_overlap {
802 if i + 1 < new_lines.len() {
803 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
805 return new.get(consumed.min(new.len())..).unwrap_or("");
806 }
807 return "";
808 }
809 }
810 }
811
812 new
814}
815
816async fn follow_logs(
818 client: &reqwest::Client,
819 base: &str,
820 name: &str,
821 lines: usize,
822 token: Option<&str>,
823 writer: &mut (dyn std::io::Write + Send),
824) -> Result<()> {
825 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
826 write!(writer, "{prev_output}")?;
827
828 let mut unchanged_ticks: u32 = 0;
829
830 loop {
831 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
832
833 let new_output = fetch_output(client, base, name, lines, token).await?;
835
836 let diff = diff_output(&prev_output, &new_output);
837 if diff.is_empty() {
838 unchanged_ticks += 1;
839 } else {
840 write!(writer, "{diff}")?;
841 unchanged_ticks = 0;
842 }
843
844 if new_output.contains(AGENT_EXIT_MARKER) {
846 break;
847 }
848
849 prev_output = new_output;
850
851 if unchanged_ticks >= 3 {
853 let status = fetch_session_status(client, base, name, token).await?;
854 let is_terminal = status == "ready" || status == "killed" || status == "lost";
855 if is_terminal {
856 break;
857 }
858 }
859 }
860 Ok(())
861}
862
863#[cfg(not(coverage))]
867async fn execute_schedule(
868 client: &reqwest::Client,
869 action: &ScheduleAction,
870 base: &str,
871 token: Option<&str>,
872) -> Result<String> {
873 match action {
874 ScheduleAction::Add {
875 name,
876 cron,
877 workdir,
878 node,
879 ink,
880 description,
881 command,
882 } => {
883 let cmd = if command.is_empty() {
884 None
885 } else {
886 Some(command.join(" "))
887 };
888 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
889 std::env::current_dir()
890 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
891 });
892 let mut body = serde_json::json!({
893 "name": name,
894 "cron": cron,
895 "workdir": resolved_workdir,
896 });
897 if let Some(c) = &cmd {
898 body["command"] = serde_json::json!(c);
899 }
900 if let Some(n) = node {
901 body["target_node"] = serde_json::json!(n);
902 }
903 if let Some(i) = ink {
904 body["ink"] = serde_json::json!(i);
905 }
906 if let Some(d) = description {
907 body["description"] = serde_json::json!(d);
908 }
909 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
910 .json(&body)
911 .send()
912 .await?;
913 ok_or_api_error(resp).await?;
914 Ok(format!("Created schedule \"{name}\""))
915 }
916 ScheduleAction::List => {
917 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
918 .send()
919 .await?;
920 let text = ok_or_api_error(resp).await?;
921 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
922 Ok(format_schedules(&schedules))
923 }
924 ScheduleAction::Remove { name } => {
925 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
926 .send()
927 .await?;
928 ok_or_api_error(resp).await?;
929 Ok(format!("Removed schedule \"{name}\""))
930 }
931 ScheduleAction::Pause { name } => {
932 let body = serde_json::json!({ "enabled": false });
933 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
934 .json(&body)
935 .send()
936 .await?;
937 ok_or_api_error(resp).await?;
938 Ok(format!("Paused schedule \"{name}\""))
939 }
940 ScheduleAction::Resume { name } => {
941 let body = serde_json::json!({ "enabled": true });
942 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
943 .json(&body)
944 .send()
945 .await?;
946 ok_or_api_error(resp).await?;
947 Ok(format!("Resumed schedule \"{name}\""))
948 }
949 }
950}
951
952#[cfg(coverage)]
954#[allow(clippy::unnecessary_wraps)]
955async fn execute_schedule(
956 _client: &reqwest::Client,
957 _action: &ScheduleAction,
958 _base: &str,
959 _token: Option<&str>,
960) -> Result<String> {
961 Ok(String::new())
962}
963
964#[cfg_attr(coverage, allow(dead_code))]
968fn format_secrets(secrets: &[serde_json::Value]) -> String {
969 if secrets.is_empty() {
970 return "No secrets configured.".into();
971 }
972 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
973 for s in secrets {
974 let name = s["name"].as_str().unwrap_or("?");
975 let env_display = s["env"]
976 .as_str()
977 .map_or_else(|| name.to_owned(), String::from);
978 let created = s["created_at"]
979 .as_str()
980 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
981 lines.push(format!("{name:<24} {env_display:<24} {created}"));
982 }
983 lines.join("\n")
984}
985
986#[cfg(not(coverage))]
988async fn execute_secret(
989 client: &reqwest::Client,
990 action: &SecretAction,
991 base: &str,
992 token: Option<&str>,
993) -> Result<String> {
994 match action {
995 SecretAction::Set { name, value, env } => {
996 let mut body = serde_json::json!({ "value": value });
997 if let Some(e) = env {
998 body["env"] = serde_json::json!(e);
999 }
1000 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1001 .json(&body)
1002 .send()
1003 .await?;
1004 ok_or_api_error(resp).await?;
1005 Ok(format!("Secret \"{name}\" set."))
1006 }
1007 SecretAction::List => {
1008 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1009 .send()
1010 .await?;
1011 let text = ok_or_api_error(resp).await?;
1012 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1013 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1014 Ok(format_secrets(secrets))
1015 }
1016 SecretAction::Delete { name } => {
1017 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1018 .send()
1019 .await?;
1020 ok_or_api_error(resp).await?;
1021 Ok(format!("Secret \"{name}\" deleted."))
1022 }
1023 }
1024}
1025
1026#[cfg(coverage)]
1028#[allow(clippy::unnecessary_wraps)]
1029async fn execute_secret(
1030 _client: &reqwest::Client,
1031 _action: &SecretAction,
1032 _base: &str,
1033 _token: Option<&str>,
1034) -> Result<String> {
1035 Ok(String::new())
1036}
1037
1038#[cfg_attr(coverage, allow(dead_code))]
1040fn format_schedules(schedules: &[serde_json::Value]) -> String {
1041 if schedules.is_empty() {
1042 return "No schedules.".into();
1043 }
1044 let mut lines = vec![format!(
1045 "{:<20} {:<18} {:<8} {:<12} {}",
1046 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1047 )];
1048 for s in schedules {
1049 let name = s["name"].as_str().unwrap_or("?");
1050 let cron = s["cron"].as_str().unwrap_or("?");
1051 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1052 "yes"
1053 } else {
1054 "no"
1055 };
1056 let last_run = s["last_run_at"]
1057 .as_str()
1058 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1059 let node = s["target_node"].as_str().unwrap_or("local");
1060 lines.push(format!(
1061 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1062 ));
1063 }
1064 lines.join("\n")
1065}
1066
1067#[cfg(not(coverage))]
1071async fn select_best_node(
1072 client: &reqwest::Client,
1073 base: &str,
1074 token: Option<&str>,
1075) -> Result<(String, String)> {
1076 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1077 .send()
1078 .await?;
1079 let text = ok_or_api_error(resp).await?;
1080 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1081
1082 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1086 if peer.status != pulpo_common::peer::PeerStatus::Online {
1087 continue;
1088 }
1089 let sessions = peer.session_count.unwrap_or(0);
1090 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1091 #[allow(clippy::cast_precision_loss)]
1093 let score = sessions as f64 - (mem as f64 / 1024.0);
1094 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1095 best = Some((peer.address.clone(), peer.name.clone(), score));
1096 }
1097 }
1098
1099 match best {
1101 Some((addr, name, _)) => Ok((addr, name)),
1102 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1103 }
1104}
1105
1106#[cfg(coverage)]
1108#[allow(clippy::unnecessary_wraps)]
1109async fn select_best_node(
1110 _client: &reqwest::Client,
1111 _base: &str,
1112 _token: Option<&str>,
1113) -> Result<(String, String)> {
1114 Ok(("localhost:7433".into(), "local".into()))
1115}
1116
1117#[allow(clippy::too_many_lines)]
1119pub async fn execute(cli: &Cli) -> Result<String> {
1120 let client = reqwest::Client::new();
1121 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1122 let url = base_url(&resolved_node);
1123 let node = &resolved_node;
1124 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1125 .await
1126 .or(peer_token);
1127
1128 if cli.command.is_none() && cli.path.is_none() {
1130 use clap::CommandFactory;
1132 let mut cmd = Cli::command();
1133 cmd.print_help()?;
1134 println!();
1135 return Ok(String::new());
1136 }
1137 if cli.command.is_none() {
1138 let path = cli.path.as_deref().unwrap_or(".");
1139 let resolved_workdir = resolve_path(path);
1140 let base_name = derive_session_name(&resolved_workdir);
1141 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1142 let body = serde_json::json!({
1143 "name": name,
1144 "workdir": resolved_workdir,
1145 });
1146 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1147 .json(&body)
1148 .send()
1149 .await
1150 .map_err(|e| friendly_error(&e, node))?;
1151 let text = ok_or_api_error(resp).await?;
1152 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1153 let msg = format!(
1154 "Created session \"{}\" ({})",
1155 resp.session.name, resp.session.id
1156 );
1157 let backend_id = resp
1158 .session
1159 .backend_session_id
1160 .as_deref()
1161 .unwrap_or(&resp.session.name);
1162 eprintln!("{msg}");
1163 attach_session(backend_id)?;
1166 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1167 }
1168
1169 match cli.command.as_ref().unwrap() {
1170 Commands::Attach { name } => {
1171 let resp = authed_get(
1173 &client,
1174 format!("{url}/api/v1/sessions/{name}"),
1175 token.as_deref(),
1176 )
1177 .send()
1178 .await
1179 .map_err(|e| friendly_error(&e, node))?;
1180 let text = ok_or_api_error(resp).await?;
1181 let session: Session = serde_json::from_str(&text)?;
1182 match session.status.to_string().as_str() {
1183 "lost" => {
1184 anyhow::bail!(
1185 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1186 );
1187 }
1188 "killed" => {
1189 anyhow::bail!(
1190 "Session \"{name}\" is {} — cannot attach to a killed session.",
1191 session.status
1192 );
1193 }
1194 _ => {}
1195 }
1196 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1197 attach_session(&backend_id)?;
1198 Ok(format!("Detached from session {name}."))
1199 }
1200 Commands::Input { name, text } => {
1201 let input_text = text.as_deref().unwrap_or("\n");
1202 let body = serde_json::json!({ "text": input_text });
1203 let resp = authed_post(
1204 &client,
1205 format!("{url}/api/v1/sessions/{name}/input"),
1206 token.as_deref(),
1207 )
1208 .json(&body)
1209 .send()
1210 .await
1211 .map_err(|e| friendly_error(&e, node))?;
1212 ok_or_api_error(resp).await?;
1213 Ok(format!("Sent input to session {name}."))
1214 }
1215 Commands::List { all } => {
1216 let list_url = if *all {
1217 format!("{url}/api/v1/sessions")
1218 } else {
1219 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1220 };
1221 let resp = authed_get(&client, list_url, token.as_deref())
1222 .send()
1223 .await
1224 .map_err(|e| friendly_error(&e, node))?;
1225 let text = ok_or_api_error(resp).await?;
1226 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1227 Ok(format_sessions(&sessions))
1228 }
1229 Commands::Nodes => {
1230 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1231 .send()
1232 .await
1233 .map_err(|e| friendly_error(&e, node))?;
1234 let text = ok_or_api_error(resp).await?;
1235 let resp: PeersResponse = serde_json::from_str(&text)?;
1236 Ok(format_nodes(&resp))
1237 }
1238 Commands::Spawn {
1239 workdir,
1240 name,
1241 ink,
1242 description,
1243 detach,
1244 idle_threshold,
1245 auto,
1246 worktree,
1247 runtime,
1248 secret,
1249 command,
1250 } => {
1251 let cmd = if command.is_empty() {
1252 None
1253 } else {
1254 Some(command.join(" "))
1255 };
1256 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1258 std::env::current_dir()
1259 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1260 });
1261 let resolved_name = if let Some(n) = name {
1263 n.clone()
1264 } else {
1265 let base_name = derive_session_name(&resolved_workdir);
1266 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1267 };
1268 let mut body = serde_json::json!({
1269 "name": resolved_name,
1270 "workdir": resolved_workdir,
1271 });
1272 if let Some(c) = &cmd {
1273 body["command"] = serde_json::json!(c);
1274 }
1275 if let Some(i) = ink {
1276 body["ink"] = serde_json::json!(i);
1277 }
1278 if let Some(d) = description {
1279 body["description"] = serde_json::json!(d);
1280 }
1281 if let Some(t) = idle_threshold {
1282 body["idle_threshold_secs"] = serde_json::json!(t);
1283 }
1284 if *worktree {
1285 body["worktree"] = serde_json::json!(true);
1286 eprintln!(
1287 "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1288 );
1289 }
1290 if let Some(rt) = runtime {
1291 body["runtime"] = serde_json::json!(rt);
1292 }
1293 if !secret.is_empty() {
1294 body["secrets"] = serde_json::json!(secret);
1295 }
1296 let spawn_url = if *auto {
1297 let (auto_addr, auto_name) =
1298 select_best_node(&client, &url, token.as_deref()).await?;
1299 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1300 base_url(&auto_addr)
1301 } else {
1302 url.clone()
1303 };
1304 let resp = authed_post(
1305 &client,
1306 format!("{spawn_url}/api/v1/sessions"),
1307 token.as_deref(),
1308 )
1309 .json(&body)
1310 .send()
1311 .await
1312 .map_err(|e| friendly_error(&e, node))?;
1313 let text = ok_or_api_error(resp).await?;
1314 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1315 let msg = format!(
1316 "Created session \"{}\" ({})",
1317 resp.session.name, resp.session.id
1318 );
1319 if !detach {
1320 let backend_id = resp
1321 .session
1322 .backend_session_id
1323 .as_deref()
1324 .unwrap_or(&resp.session.name);
1325 eprintln!("{msg}");
1326 if cmd.is_some() {
1329 let sid = resp.session.id.to_string();
1330 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1331 }
1332 attach_session(backend_id)?;
1333 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1334 }
1335 Ok(msg)
1336 }
1337 Commands::Kill { name } => {
1338 let resp = authed_post(
1339 &client,
1340 format!("{url}/api/v1/sessions/{name}/kill"),
1341 token.as_deref(),
1342 )
1343 .send()
1344 .await
1345 .map_err(|e| friendly_error(&e, node))?;
1346 ok_or_api_error(resp).await?;
1347 Ok(format!("Session {name} killed."))
1348 }
1349 Commands::Delete { name } => {
1350 let resp = authed_delete(
1351 &client,
1352 format!("{url}/api/v1/sessions/{name}"),
1353 token.as_deref(),
1354 )
1355 .send()
1356 .await
1357 .map_err(|e| friendly_error(&e, node))?;
1358 ok_or_api_error(resp).await?;
1359 Ok(format!("Session {name} deleted."))
1360 }
1361 Commands::Logs {
1362 name,
1363 lines,
1364 follow,
1365 } => {
1366 if *follow {
1367 let mut stdout = std::io::stdout();
1368 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1369 .await
1370 .map_err(|e| {
1371 match e.downcast::<reqwest::Error>() {
1373 Ok(re) => friendly_error(&re, node),
1374 Err(other) => other,
1375 }
1376 })?;
1377 Ok(String::new())
1378 } else {
1379 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1380 .await
1381 .map_err(|e| match e.downcast::<reqwest::Error>() {
1382 Ok(re) => friendly_error(&re, node),
1383 Err(other) => other,
1384 })?;
1385 Ok(output)
1386 }
1387 }
1388 Commands::Interventions { name } => {
1389 let resp = authed_get(
1390 &client,
1391 format!("{url}/api/v1/sessions/{name}/interventions"),
1392 token.as_deref(),
1393 )
1394 .send()
1395 .await
1396 .map_err(|e| friendly_error(&e, node))?;
1397 let text = ok_or_api_error(resp).await?;
1398 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1399 Ok(format_interventions(&events))
1400 }
1401 Commands::Ui => {
1402 let dashboard = base_url(node);
1403 open_browser(&dashboard)?;
1404 Ok(format!("Opening {dashboard}"))
1405 }
1406 Commands::Resume { name } => {
1407 let resp = authed_post(
1408 &client,
1409 format!("{url}/api/v1/sessions/{name}/resume"),
1410 token.as_deref(),
1411 )
1412 .send()
1413 .await
1414 .map_err(|e| friendly_error(&e, node))?;
1415 let text = ok_or_api_error(resp).await?;
1416 let session: Session = serde_json::from_str(&text)?;
1417 let backend_id = session
1418 .backend_session_id
1419 .as_deref()
1420 .unwrap_or(&session.name);
1421 eprintln!("Resumed session \"{}\"", session.name);
1422 let sid = session.id.to_string();
1423 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1424 attach_session(backend_id)?;
1425 Ok(format!("Detached from session \"{}\".", session.name))
1426 }
1427 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1428 .await
1429 .map_err(|e| match e.downcast::<reqwest::Error>() {
1430 Ok(re) => friendly_error(&re, node),
1431 Err(other) => other,
1432 }),
1433 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1434 .await
1435 .map_err(|e| match e.downcast::<reqwest::Error>() {
1436 Ok(re) => friendly_error(&re, node),
1437 Err(other) => other,
1438 }),
1439 }
1440}
1441
1442#[cfg(test)]
1443mod tests {
1444 use super::*;
1445
1446 #[test]
1447 fn test_base_url() {
1448 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1449 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1450 }
1451
1452 #[test]
1453 fn test_cli_parse_list() {
1454 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1455 assert_eq!(cli.node, "localhost:7433");
1456 assert!(matches!(cli.command, Some(Commands::List { .. })));
1457 }
1458
1459 #[test]
1460 fn test_cli_parse_nodes() {
1461 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1462 assert!(matches!(cli.command, Some(Commands::Nodes)));
1463 }
1464
1465 #[test]
1466 fn test_cli_parse_ui() {
1467 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1468 assert!(matches!(cli.command, Some(Commands::Ui)));
1469 }
1470
1471 #[test]
1472 fn test_cli_parse_ui_custom_node() {
1473 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1474 assert_eq!(cli.node, "mac-mini:7433");
1476 assert_eq!(cli.path.as_deref(), Some("ui"));
1477 }
1478
1479 #[test]
1480 fn test_build_open_command() {
1481 let cmd = build_open_command("http://localhost:7433");
1482 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1483 assert_eq!(args, vec!["http://localhost:7433"]);
1484 #[cfg(target_os = "macos")]
1485 assert_eq!(cmd.get_program(), "open");
1486 #[cfg(target_os = "linux")]
1487 assert_eq!(cmd.get_program(), "xdg-open");
1488 }
1489
1490 #[test]
1491 fn test_cli_parse_spawn() {
1492 let cli = Cli::try_parse_from([
1493 "pulpo",
1494 "spawn",
1495 "my-task",
1496 "--workdir",
1497 "/tmp/repo",
1498 "--",
1499 "claude",
1500 "-p",
1501 "Fix the bug",
1502 ])
1503 .unwrap();
1504 assert!(matches!(
1505 &cli.command,
1506 Some(Commands::Spawn { name, workdir, command, .. })
1507 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1508 && command == &["claude", "-p", "Fix the bug"]
1509 ));
1510 }
1511
1512 #[test]
1513 fn test_cli_parse_spawn_with_ink() {
1514 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1515 assert!(matches!(
1516 &cli.command,
1517 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1518 ));
1519 }
1520
1521 #[test]
1522 fn test_cli_parse_spawn_with_description() {
1523 let cli =
1524 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1525 .unwrap();
1526 assert!(matches!(
1527 &cli.command,
1528 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1529 ));
1530 }
1531
1532 #[test]
1533 fn test_cli_parse_spawn_name_positional() {
1534 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1535 assert!(matches!(
1536 &cli.command,
1537 Some(Commands::Spawn { name, command, .. })
1538 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1539 ));
1540 }
1541
1542 #[test]
1543 fn test_cli_parse_spawn_no_command() {
1544 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1545 assert!(matches!(
1546 &cli.command,
1547 Some(Commands::Spawn { command, .. }) if command.is_empty()
1548 ));
1549 }
1550
1551 #[test]
1552 fn test_cli_parse_spawn_idle_threshold() {
1553 let cli =
1554 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1555 assert!(matches!(
1556 &cli.command,
1557 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1558 ));
1559 }
1560
1561 #[test]
1562 fn test_cli_parse_spawn_idle_threshold_60() {
1563 let cli =
1564 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1565 assert!(matches!(
1566 &cli.command,
1567 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1568 ));
1569 }
1570
1571 #[test]
1572 fn test_cli_parse_spawn_secrets() {
1573 let cli = Cli::try_parse_from([
1574 "pulpo",
1575 "spawn",
1576 "my-task",
1577 "--secret",
1578 "GITHUB_TOKEN",
1579 "--secret",
1580 "NPM_TOKEN",
1581 ])
1582 .unwrap();
1583 assert!(matches!(
1584 &cli.command,
1585 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1586 ));
1587 }
1588
1589 #[test]
1590 fn test_cli_parse_spawn_no_secrets() {
1591 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1592 assert!(matches!(
1593 &cli.command,
1594 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1595 ));
1596 }
1597
1598 #[test]
1599 fn test_cli_parse_secret_set_with_env() {
1600 let cli = Cli::try_parse_from([
1601 "pulpo",
1602 "secret",
1603 "set",
1604 "GH_WORK",
1605 "token123",
1606 "--env",
1607 "GITHUB_TOKEN",
1608 ])
1609 .unwrap();
1610 assert!(matches!(
1611 &cli.command,
1612 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1613 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1614 ));
1615 }
1616
1617 #[test]
1618 fn test_cli_parse_secret_set_without_env() {
1619 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1620 assert!(matches!(
1621 &cli.command,
1622 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1623 if name == "MY_KEY" && value == "val" && env.is_none()
1624 ));
1625 }
1626
1627 #[test]
1628 fn test_cli_parse_spawn_worktree() {
1629 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1630 assert!(matches!(
1631 &cli.command,
1632 Some(Commands::Spawn { worktree, .. }) if *worktree
1633 ));
1634 }
1635
1636 #[test]
1637 fn test_cli_parse_spawn_detach() {
1638 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1639 assert!(matches!(
1640 &cli.command,
1641 Some(Commands::Spawn { detach, .. }) if *detach
1642 ));
1643 }
1644
1645 #[test]
1646 fn test_cli_parse_spawn_detach_short() {
1647 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1648 assert!(matches!(
1649 &cli.command,
1650 Some(Commands::Spawn { detach, .. }) if *detach
1651 ));
1652 }
1653
1654 #[test]
1655 fn test_cli_parse_spawn_detach_default() {
1656 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1657 assert!(matches!(
1658 &cli.command,
1659 Some(Commands::Spawn { detach, .. }) if !detach
1660 ));
1661 }
1662
1663 #[test]
1664 fn test_cli_parse_logs() {
1665 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1666 assert!(matches!(
1667 &cli.command,
1668 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1669 ));
1670 }
1671
1672 #[test]
1673 fn test_cli_parse_logs_with_lines() {
1674 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1675 assert!(matches!(
1676 &cli.command,
1677 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1678 ));
1679 }
1680
1681 #[test]
1682 fn test_cli_parse_logs_follow() {
1683 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1684 assert!(matches!(
1685 &cli.command,
1686 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1687 ));
1688 }
1689
1690 #[test]
1691 fn test_cli_parse_logs_follow_short() {
1692 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1693 assert!(matches!(
1694 &cli.command,
1695 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1696 ));
1697 }
1698
1699 #[test]
1700 fn test_cli_parse_kill() {
1701 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1702 assert!(matches!(
1703 &cli.command,
1704 Some(Commands::Kill { name }) if name == "my-session"
1705 ));
1706 }
1707
1708 #[test]
1709 fn test_cli_parse_delete() {
1710 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1711 assert!(matches!(
1712 &cli.command,
1713 Some(Commands::Delete { name }) if name == "my-session"
1714 ));
1715 }
1716
1717 #[test]
1718 fn test_cli_parse_resume() {
1719 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1720 assert!(matches!(
1721 &cli.command,
1722 Some(Commands::Resume { name }) if name == "my-session"
1723 ));
1724 }
1725
1726 #[test]
1727 fn test_cli_parse_input() {
1728 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1729 assert!(matches!(
1730 &cli.command,
1731 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1732 ));
1733 }
1734
1735 #[test]
1736 fn test_cli_parse_input_no_text() {
1737 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1738 assert!(matches!(
1739 &cli.command,
1740 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1741 ));
1742 }
1743
1744 #[test]
1745 fn test_cli_parse_input_alias() {
1746 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1747 assert!(matches!(
1748 &cli.command,
1749 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1750 ));
1751 }
1752
1753 #[test]
1754 fn test_cli_parse_custom_node() {
1755 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1756 assert_eq!(cli.node, "win-pc:8080");
1757 assert_eq!(cli.path.as_deref(), Some("list"));
1759 }
1760
1761 #[test]
1762 fn test_cli_version() {
1763 let result = Cli::try_parse_from(["pulpo", "--version"]);
1764 let err = result.unwrap_err();
1766 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1767 }
1768
1769 #[test]
1770 fn test_cli_parse_no_subcommand_succeeds() {
1771 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1772 assert!(cli.command.is_none());
1773 assert!(cli.path.is_none());
1774 }
1775
1776 #[test]
1777 fn test_cli_debug() {
1778 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1779 let debug = format!("{cli:?}");
1780 assert!(debug.contains("List"));
1781 }
1782
1783 #[test]
1784 fn test_commands_debug() {
1785 let cmd = Commands::List { all: false };
1786 assert_eq!(format!("{cmd:?}"), "List { all: false }");
1787 }
1788
1789 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1791
1792 fn test_create_response_json() -> String {
1794 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1795 }
1796
1797 async fn start_test_server() -> String {
1799 use axum::http::StatusCode;
1800 use axum::{
1801 Json, Router,
1802 routing::{get, post},
1803 };
1804
1805 let create_json = test_create_response_json();
1806
1807 let app = Router::new()
1808 .route(
1809 "/api/v1/sessions",
1810 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1811 (StatusCode::CREATED, create_json.clone())
1812 }),
1813 )
1814 .route(
1815 "/api/v1/sessions/{id}",
1816 get(|| async { TEST_SESSION_JSON.to_owned() })
1817 .delete(|| async { StatusCode::NO_CONTENT }),
1818 )
1819 .route(
1820 "/api/v1/sessions/{id}/kill",
1821 post(|| async { StatusCode::NO_CONTENT }),
1822 )
1823 .route(
1824 "/api/v1/sessions/{id}/output",
1825 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1826 )
1827 .route(
1828 "/api/v1/peers",
1829 get(|| async {
1830 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1831 }),
1832 )
1833 .route(
1834 "/api/v1/sessions/{id}/resume",
1835 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1836 )
1837 .route(
1838 "/api/v1/sessions/{id}/interventions",
1839 get(|| async { "[]".to_owned() }),
1840 )
1841 .route(
1842 "/api/v1/sessions/{id}/input",
1843 post(|| async { StatusCode::NO_CONTENT }),
1844 )
1845 .route(
1846 "/api/v1/schedules",
1847 get(|| async { Json::<Vec<()>>(vec![]) })
1848 .post(|| async { StatusCode::CREATED }),
1849 )
1850 .route(
1851 "/api/v1/schedules/{id}",
1852 axum::routing::put(|| async { StatusCode::OK })
1853 .delete(|| async { StatusCode::NO_CONTENT }),
1854 );
1855
1856 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1857 let addr = listener.local_addr().unwrap();
1858 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1859 format!("127.0.0.1:{}", addr.port())
1860 }
1861
1862 #[tokio::test]
1863 async fn test_execute_list_success() {
1864 let node = start_test_server().await;
1865 let cli = Cli {
1866 node,
1867 token: None,
1868 command: Some(Commands::List { all: false }),
1869 path: None,
1870 };
1871 let result = execute(&cli).await.unwrap();
1872 assert_eq!(result, "No sessions.");
1873 }
1874
1875 #[tokio::test]
1876 async fn test_execute_nodes_success() {
1877 let node = start_test_server().await;
1878 let cli = Cli {
1879 node,
1880 token: None,
1881 command: Some(Commands::Nodes),
1882 path: None,
1883 };
1884 let result = execute(&cli).await.unwrap();
1885 assert!(result.contains("test"));
1886 assert!(result.contains("(local)"));
1887 assert!(result.contains("NAME"));
1888 }
1889
1890 #[tokio::test]
1891 async fn test_execute_spawn_success() {
1892 let node = start_test_server().await;
1893 let cli = Cli {
1894 node,
1895 token: None,
1896 command: Some(Commands::Spawn {
1897 name: Some("test".into()),
1898 workdir: Some("/tmp/repo".into()),
1899 ink: None,
1900 description: None,
1901 detach: true,
1902 idle_threshold: None,
1903 auto: false,
1904 worktree: false,
1905 runtime: None,
1906 secret: vec![],
1907 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1908 }),
1909 path: None,
1910 };
1911 let result = execute(&cli).await.unwrap();
1912 assert!(result.contains("Created session"));
1913 assert!(result.contains("repo"));
1914 }
1915
1916 #[tokio::test]
1917 async fn test_execute_spawn_with_all_flags() {
1918 let node = start_test_server().await;
1919 let cli = Cli {
1920 node,
1921 token: None,
1922 command: Some(Commands::Spawn {
1923 name: Some("test".into()),
1924 workdir: Some("/tmp/repo".into()),
1925 ink: Some("coder".into()),
1926 description: Some("Fix the bug".into()),
1927 detach: true,
1928 idle_threshold: None,
1929 auto: false,
1930 worktree: false,
1931 runtime: None,
1932 secret: vec![],
1933 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1934 }),
1935 path: None,
1936 };
1937 let result = execute(&cli).await.unwrap();
1938 assert!(result.contains("Created session"));
1939 }
1940
1941 #[tokio::test]
1942 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1943 let node = start_test_server().await;
1944 let cli = Cli {
1945 node,
1946 token: None,
1947 command: Some(Commands::Spawn {
1948 name: Some("full-opts".into()),
1949 workdir: Some("/tmp/repo".into()),
1950 ink: Some("coder".into()),
1951 description: Some("Full options".into()),
1952 detach: true,
1953 idle_threshold: Some(120),
1954 auto: false,
1955 worktree: true,
1956 runtime: Some("docker".into()),
1957 secret: vec![],
1958 command: vec!["claude".into()],
1959 }),
1960 path: None,
1961 };
1962 let result = execute(&cli).await.unwrap();
1963 assert!(result.contains("Created session"));
1964 }
1965
1966 #[tokio::test]
1967 async fn test_execute_spawn_no_name_derives_from_workdir() {
1968 let node = start_test_server().await;
1969 let cli = Cli {
1970 node,
1971 token: None,
1972 command: Some(Commands::Spawn {
1973 name: None,
1974 workdir: Some("/tmp/my-project".into()),
1975 ink: None,
1976 description: None,
1977 detach: true,
1978 idle_threshold: None,
1979 auto: false,
1980 worktree: false,
1981 runtime: None,
1982 secret: vec![],
1983 command: vec!["echo".into(), "hello".into()],
1984 }),
1985 path: None,
1986 };
1987 let result = execute(&cli).await.unwrap();
1988 assert!(result.contains("Created session"));
1989 }
1990
1991 #[tokio::test]
1992 async fn test_execute_spawn_no_command() {
1993 let node = start_test_server().await;
1994 let cli = Cli {
1995 node,
1996 token: None,
1997 command: Some(Commands::Spawn {
1998 name: Some("test".into()),
1999 workdir: Some("/tmp/repo".into()),
2000 ink: None,
2001 description: None,
2002 detach: true,
2003 idle_threshold: None,
2004 auto: false,
2005 worktree: false,
2006 runtime: None,
2007 secret: vec![],
2008 command: vec![],
2009 }),
2010 path: None,
2011 };
2012 let result = execute(&cli).await.unwrap();
2013 assert!(result.contains("Created session"));
2014 }
2015
2016 #[tokio::test]
2017 async fn test_execute_spawn_with_name() {
2018 let node = start_test_server().await;
2019 let cli = Cli {
2020 node,
2021 token: None,
2022 command: Some(Commands::Spawn {
2023 name: Some("my-task".into()),
2024 workdir: Some("/tmp/repo".into()),
2025 ink: None,
2026 description: None,
2027 detach: true,
2028 idle_threshold: None,
2029 auto: false,
2030 worktree: false,
2031 runtime: None,
2032 secret: vec![],
2033 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2034 }),
2035 path: None,
2036 };
2037 let result = execute(&cli).await.unwrap();
2038 assert!(result.contains("Created session"));
2039 }
2040
2041 #[tokio::test]
2042 async fn test_execute_spawn_auto_attach() {
2043 let node = start_test_server().await;
2044 let cli = Cli {
2045 node,
2046 token: None,
2047 command: Some(Commands::Spawn {
2048 name: Some("test".into()),
2049 workdir: Some("/tmp/repo".into()),
2050 ink: None,
2051 description: None,
2052 detach: false,
2053 idle_threshold: None,
2054 auto: false,
2055 worktree: false,
2056 runtime: None,
2057 secret: vec![],
2058 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2059 }),
2060 path: None,
2061 };
2062 let result = execute(&cli).await.unwrap();
2063 assert!(result.contains("Detached from session"));
2065 }
2066
2067 #[tokio::test]
2068 async fn test_execute_kill_success() {
2069 let node = start_test_server().await;
2070 let cli = Cli {
2071 node,
2072 token: None,
2073 command: Some(Commands::Kill {
2074 name: "test-session".into(),
2075 }),
2076 path: None,
2077 };
2078 let result = execute(&cli).await.unwrap();
2079 assert!(result.contains("killed"));
2080 }
2081
2082 #[tokio::test]
2083 async fn test_execute_delete_success() {
2084 let node = start_test_server().await;
2085 let cli = Cli {
2086 node,
2087 token: None,
2088 command: Some(Commands::Delete {
2089 name: "test-session".into(),
2090 }),
2091 path: None,
2092 };
2093 let result = execute(&cli).await.unwrap();
2094 assert!(result.contains("deleted"));
2095 }
2096
2097 #[tokio::test]
2098 async fn test_execute_logs_success() {
2099 let node = start_test_server().await;
2100 let cli = Cli {
2101 node,
2102 token: None,
2103 command: Some(Commands::Logs {
2104 name: "test-session".into(),
2105 lines: 50,
2106 follow: false,
2107 }),
2108 path: None,
2109 };
2110 let result = execute(&cli).await.unwrap();
2111 assert!(result.contains("test output"));
2112 }
2113
2114 #[tokio::test]
2115 async fn test_execute_list_connection_refused() {
2116 let cli = Cli {
2117 node: "localhost:1".into(),
2118 token: None,
2119 command: Some(Commands::List { all: false }),
2120 path: None,
2121 };
2122 let result = execute(&cli).await;
2123 let err = result.unwrap_err().to_string();
2124 assert!(
2125 err.contains("Could not connect to pulpod"),
2126 "Expected friendly error, got: {err}"
2127 );
2128 assert!(err.contains("localhost:1"));
2129 }
2130
2131 #[tokio::test]
2132 async fn test_execute_nodes_connection_refused() {
2133 let cli = Cli {
2134 node: "localhost:1".into(),
2135 token: None,
2136 command: Some(Commands::Nodes),
2137 path: None,
2138 };
2139 let result = execute(&cli).await;
2140 let err = result.unwrap_err().to_string();
2141 assert!(err.contains("Could not connect to pulpod"));
2142 }
2143
2144 #[tokio::test]
2145 async fn test_execute_kill_error_response() {
2146 use axum::{Router, http::StatusCode, routing::post};
2147
2148 let app = Router::new().route(
2149 "/api/v1/sessions/{id}/kill",
2150 post(|| async {
2151 (
2152 StatusCode::NOT_FOUND,
2153 "{\"error\":\"session not found: test-session\"}",
2154 )
2155 }),
2156 );
2157 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2158 let addr = listener.local_addr().unwrap();
2159 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2160 let node = format!("127.0.0.1:{}", addr.port());
2161
2162 let cli = Cli {
2163 node,
2164 token: None,
2165 command: Some(Commands::Kill {
2166 name: "test-session".into(),
2167 }),
2168 path: None,
2169 };
2170 let err = execute(&cli).await.unwrap_err();
2171 assert_eq!(err.to_string(), "session not found: test-session");
2172 }
2173
2174 #[tokio::test]
2175 async fn test_execute_delete_error_response() {
2176 use axum::{Router, http::StatusCode, routing::delete};
2177
2178 let app = Router::new().route(
2179 "/api/v1/sessions/{id}",
2180 delete(|| async {
2181 (
2182 StatusCode::CONFLICT,
2183 "{\"error\":\"cannot delete session in 'running' state\"}",
2184 )
2185 }),
2186 );
2187 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2188 let addr = listener.local_addr().unwrap();
2189 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2190 let node = format!("127.0.0.1:{}", addr.port());
2191
2192 let cli = Cli {
2193 node,
2194 token: None,
2195 command: Some(Commands::Delete {
2196 name: "test-session".into(),
2197 }),
2198 path: None,
2199 };
2200 let err = execute(&cli).await.unwrap_err();
2201 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
2202 }
2203
2204 #[tokio::test]
2205 async fn test_execute_logs_error_response() {
2206 use axum::{Router, http::StatusCode, routing::get};
2207
2208 let app = Router::new().route(
2209 "/api/v1/sessions/{id}/output",
2210 get(|| async {
2211 (
2212 StatusCode::NOT_FOUND,
2213 "{\"error\":\"session not found: ghost\"}",
2214 )
2215 }),
2216 );
2217 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2218 let addr = listener.local_addr().unwrap();
2219 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2220 let node = format!("127.0.0.1:{}", addr.port());
2221
2222 let cli = Cli {
2223 node,
2224 token: None,
2225 command: Some(Commands::Logs {
2226 name: "ghost".into(),
2227 lines: 50,
2228 follow: false,
2229 }),
2230 path: None,
2231 };
2232 let err = execute(&cli).await.unwrap_err();
2233 assert_eq!(err.to_string(), "session not found: ghost");
2234 }
2235
2236 #[tokio::test]
2237 async fn test_execute_resume_error_response() {
2238 use axum::{Router, http::StatusCode, routing::post};
2239
2240 let app = Router::new().route(
2241 "/api/v1/sessions/{id}/resume",
2242 post(|| async {
2243 (
2244 StatusCode::BAD_REQUEST,
2245 "{\"error\":\"session is not lost (status: active)\"}",
2246 )
2247 }),
2248 );
2249 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2250 let addr = listener.local_addr().unwrap();
2251 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2252 let node = format!("127.0.0.1:{}", addr.port());
2253
2254 let cli = Cli {
2255 node,
2256 token: None,
2257 command: Some(Commands::Resume {
2258 name: "test-session".into(),
2259 }),
2260 path: None,
2261 };
2262 let err = execute(&cli).await.unwrap_err();
2263 assert_eq!(err.to_string(), "session is not lost (status: active)");
2264 }
2265
2266 #[tokio::test]
2267 async fn test_execute_spawn_error_response() {
2268 use axum::{Router, http::StatusCode, routing::post};
2269
2270 let app = Router::new().route(
2271 "/api/v1/sessions",
2272 post(|| async {
2273 (
2274 StatusCode::INTERNAL_SERVER_ERROR,
2275 "{\"error\":\"failed to spawn session\"}",
2276 )
2277 }),
2278 );
2279 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2280 let addr = listener.local_addr().unwrap();
2281 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2282 let node = format!("127.0.0.1:{}", addr.port());
2283
2284 let cli = Cli {
2285 node,
2286 token: None,
2287 command: Some(Commands::Spawn {
2288 name: Some("test".into()),
2289 workdir: Some("/tmp/repo".into()),
2290 ink: None,
2291 description: None,
2292 detach: true,
2293 idle_threshold: None,
2294 auto: false,
2295 worktree: false,
2296 runtime: None,
2297 secret: vec![],
2298 command: vec!["test".into()],
2299 }),
2300 path: None,
2301 };
2302 let err = execute(&cli).await.unwrap_err();
2303 assert_eq!(err.to_string(), "failed to spawn session");
2304 }
2305
2306 #[tokio::test]
2307 async fn test_execute_interventions_error_response() {
2308 use axum::{Router, http::StatusCode, routing::get};
2309
2310 let app = Router::new().route(
2311 "/api/v1/sessions/{id}/interventions",
2312 get(|| async {
2313 (
2314 StatusCode::NOT_FOUND,
2315 "{\"error\":\"session not found: ghost\"}",
2316 )
2317 }),
2318 );
2319 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2320 let addr = listener.local_addr().unwrap();
2321 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2322 let node = format!("127.0.0.1:{}", addr.port());
2323
2324 let cli = Cli {
2325 node,
2326 token: None,
2327 command: Some(Commands::Interventions {
2328 name: "ghost".into(),
2329 }),
2330 path: None,
2331 };
2332 let err = execute(&cli).await.unwrap_err();
2333 assert_eq!(err.to_string(), "session not found: ghost");
2334 }
2335
2336 #[tokio::test]
2337 async fn test_execute_resume_success() {
2338 let node = start_test_server().await;
2339 let cli = Cli {
2340 node,
2341 token: None,
2342 command: Some(Commands::Resume {
2343 name: "test-session".into(),
2344 }),
2345 path: None,
2346 };
2347 let result = execute(&cli).await.unwrap();
2348 assert!(result.contains("Detached from session"));
2349 }
2350
2351 #[tokio::test]
2352 async fn test_execute_input_success() {
2353 let node = start_test_server().await;
2354 let cli = Cli {
2355 node,
2356 token: None,
2357 command: Some(Commands::Input {
2358 name: "test-session".into(),
2359 text: Some("yes".into()),
2360 }),
2361 path: None,
2362 };
2363 let result = execute(&cli).await.unwrap();
2364 assert!(result.contains("Sent input to session test-session"));
2365 }
2366
2367 #[tokio::test]
2368 async fn test_execute_input_no_text() {
2369 let node = start_test_server().await;
2370 let cli = Cli {
2371 node,
2372 token: None,
2373 command: Some(Commands::Input {
2374 name: "test-session".into(),
2375 text: None,
2376 }),
2377 path: None,
2378 };
2379 let result = execute(&cli).await.unwrap();
2380 assert!(result.contains("Sent input to session test-session"));
2381 }
2382
2383 #[tokio::test]
2384 async fn test_execute_input_connection_refused() {
2385 let cli = Cli {
2386 node: "localhost:1".into(),
2387 token: None,
2388 command: Some(Commands::Input {
2389 name: "test".into(),
2390 text: Some("y".into()),
2391 }),
2392 path: None,
2393 };
2394 let result = execute(&cli).await;
2395 let err = result.unwrap_err().to_string();
2396 assert!(err.contains("Could not connect to pulpod"));
2397 }
2398
2399 #[tokio::test]
2400 async fn test_execute_input_error_response() {
2401 use axum::{Router, http::StatusCode, routing::post};
2402
2403 let app = Router::new().route(
2404 "/api/v1/sessions/{id}/input",
2405 post(|| async {
2406 (
2407 StatusCode::NOT_FOUND,
2408 "{\"error\":\"session not found: ghost\"}",
2409 )
2410 }),
2411 );
2412 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2413 let addr = listener.local_addr().unwrap();
2414 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2415 let node = format!("127.0.0.1:{}", addr.port());
2416
2417 let cli = Cli {
2418 node,
2419 token: None,
2420 command: Some(Commands::Input {
2421 name: "ghost".into(),
2422 text: Some("y".into()),
2423 }),
2424 path: None,
2425 };
2426 let err = execute(&cli).await.unwrap_err();
2427 assert_eq!(err.to_string(), "session not found: ghost");
2428 }
2429
2430 #[tokio::test]
2431 async fn test_execute_ui() {
2432 let cli = Cli {
2433 node: "localhost:7433".into(),
2434 token: None,
2435 command: Some(Commands::Ui),
2436 path: None,
2437 };
2438 let result = execute(&cli).await.unwrap();
2439 assert!(result.contains("Opening"));
2440 assert!(result.contains("http://localhost:7433"));
2441 }
2442
2443 #[tokio::test]
2444 async fn test_execute_ui_custom_node() {
2445 let cli = Cli {
2446 node: "mac-mini:7433".into(),
2447 token: None,
2448 command: Some(Commands::Ui),
2449 path: None,
2450 };
2451 let result = execute(&cli).await.unwrap();
2452 assert!(result.contains("http://mac-mini:7433"));
2453 }
2454
2455 #[test]
2456 fn test_format_sessions_empty() {
2457 assert_eq!(format_sessions(&[]), "No sessions.");
2458 }
2459
2460 #[test]
2461 fn test_format_sessions_with_data() {
2462 use chrono::Utc;
2463 use pulpo_common::session::SessionStatus;
2464 use uuid::Uuid;
2465
2466 let sessions = vec![Session {
2467 id: Uuid::nil(),
2468 name: "my-api".into(),
2469 workdir: "/tmp/repo".into(),
2470 command: "claude -p 'Fix the bug'".into(),
2471 description: Some("Fix the bug".into()),
2472 status: SessionStatus::Active,
2473 exit_code: None,
2474 backend_session_id: None,
2475 output_snapshot: None,
2476 metadata: None,
2477 ink: None,
2478 intervention_code: None,
2479 intervention_reason: None,
2480 intervention_at: None,
2481 last_output_at: None,
2482 idle_since: None,
2483 idle_threshold_secs: None,
2484 worktree_path: None,
2485 runtime: Runtime::Tmux,
2486 created_at: Utc::now(),
2487 updated_at: Utc::now(),
2488 }];
2489 let output = format_sessions(&sessions);
2490 assert!(output.contains("ID"));
2491 assert!(output.contains("NAME"));
2492 assert!(output.contains("RUNTIME"));
2493 assert!(output.contains("COMMAND"));
2494 assert!(output.contains("00000000"));
2495 assert!(output.contains("my-api"));
2496 assert!(output.contains("active"));
2497 assert!(output.contains("tmux"));
2498 assert!(output.contains("claude -p 'Fix the bug'"));
2499 }
2500
2501 #[test]
2502 fn test_format_sessions_docker_runtime() {
2503 use chrono::Utc;
2504 use pulpo_common::session::SessionStatus;
2505 use uuid::Uuid;
2506
2507 let sessions = vec![Session {
2508 id: Uuid::nil(),
2509 name: "sandbox-test".into(),
2510 workdir: "/tmp".into(),
2511 command: "claude".into(),
2512 description: None,
2513 status: SessionStatus::Active,
2514 exit_code: None,
2515 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2516 output_snapshot: None,
2517 metadata: None,
2518 ink: None,
2519 intervention_code: None,
2520 intervention_reason: None,
2521 intervention_at: None,
2522 last_output_at: None,
2523 idle_since: None,
2524 idle_threshold_secs: None,
2525 worktree_path: None,
2526 runtime: Runtime::Docker,
2527 created_at: Utc::now(),
2528 updated_at: Utc::now(),
2529 }];
2530 let output = format_sessions(&sessions);
2531 assert!(output.contains("docker"));
2532 }
2533
2534 #[test]
2535 fn test_format_sessions_long_command_truncated() {
2536 use chrono::Utc;
2537 use pulpo_common::session::SessionStatus;
2538 use uuid::Uuid;
2539
2540 let sessions = vec![Session {
2541 id: Uuid::nil(),
2542 name: "test".into(),
2543 workdir: "/tmp".into(),
2544 command:
2545 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2546 .into(),
2547 description: None,
2548 status: SessionStatus::Ready,
2549 exit_code: None,
2550 backend_session_id: None,
2551 output_snapshot: None,
2552 metadata: None,
2553 ink: None,
2554 intervention_code: None,
2555 intervention_reason: None,
2556 intervention_at: None,
2557 last_output_at: None,
2558 idle_since: None,
2559 idle_threshold_secs: None,
2560 worktree_path: None,
2561 runtime: Runtime::Tmux,
2562 created_at: Utc::now(),
2563 updated_at: Utc::now(),
2564 }];
2565 let output = format_sessions(&sessions);
2566 assert!(output.contains("..."));
2567 }
2568
2569 #[test]
2570 fn test_format_sessions_worktree_indicator() {
2571 use chrono::Utc;
2572 use pulpo_common::session::SessionStatus;
2573 use uuid::Uuid;
2574
2575 let sessions = vec![Session {
2576 id: Uuid::nil(),
2577 name: "wt-task".into(),
2578 workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2579 command: "claude".into(),
2580 description: None,
2581 status: SessionStatus::Active,
2582 exit_code: None,
2583 backend_session_id: None,
2584 output_snapshot: None,
2585 metadata: None,
2586 ink: None,
2587 intervention_code: None,
2588 intervention_reason: None,
2589 intervention_at: None,
2590 last_output_at: None,
2591 idle_since: None,
2592 idle_threshold_secs: None,
2593 worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2594 runtime: Runtime::Tmux,
2595 created_at: Utc::now(),
2596 updated_at: Utc::now(),
2597 }];
2598 let output = format_sessions(&sessions);
2599 assert!(
2600 output.contains("[wt]"),
2601 "should show worktree indicator: {output}"
2602 );
2603 assert!(output.contains("wt-task [wt]"));
2604 }
2605
2606 #[test]
2607 fn test_format_sessions_pr_indicator() {
2608 use chrono::Utc;
2609 use pulpo_common::session::SessionStatus;
2610 use std::collections::HashMap;
2611 use uuid::Uuid;
2612
2613 let mut meta = HashMap::new();
2614 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2615 let sessions = vec![Session {
2616 id: Uuid::nil(),
2617 name: "pr-task".into(),
2618 workdir: "/tmp".into(),
2619 command: "claude".into(),
2620 description: None,
2621 status: SessionStatus::Active,
2622 exit_code: None,
2623 backend_session_id: None,
2624 output_snapshot: None,
2625 metadata: Some(meta),
2626 ink: None,
2627 intervention_code: None,
2628 intervention_reason: None,
2629 intervention_at: None,
2630 last_output_at: None,
2631 idle_since: None,
2632 idle_threshold_secs: None,
2633 worktree_path: None,
2634 runtime: Runtime::Tmux,
2635 created_at: Utc::now(),
2636 updated_at: Utc::now(),
2637 }];
2638 let output = format_sessions(&sessions);
2639 assert!(
2640 output.contains("[PR]"),
2641 "should show PR indicator: {output}"
2642 );
2643 assert!(output.contains("pr-task [PR]"));
2644 }
2645
2646 #[test]
2647 fn test_format_sessions_worktree_and_pr_indicator() {
2648 use chrono::Utc;
2649 use pulpo_common::session::SessionStatus;
2650 use std::collections::HashMap;
2651 use uuid::Uuid;
2652
2653 let mut meta = HashMap::new();
2654 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2655 let sessions = vec![Session {
2656 id: Uuid::nil(),
2657 name: "both-task".into(),
2658 workdir: "/tmp".into(),
2659 command: "claude".into(),
2660 description: None,
2661 status: SessionStatus::Active,
2662 exit_code: None,
2663 backend_session_id: None,
2664 output_snapshot: None,
2665 metadata: Some(meta),
2666 ink: None,
2667 intervention_code: None,
2668 intervention_reason: None,
2669 intervention_at: None,
2670 last_output_at: None,
2671 idle_since: None,
2672 idle_threshold_secs: None,
2673 worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2674 runtime: Runtime::Tmux,
2675 created_at: Utc::now(),
2676 updated_at: Utc::now(),
2677 }];
2678 let output = format_sessions(&sessions);
2679 assert!(
2680 output.contains("[wt] [PR]"),
2681 "should show both indicators: {output}"
2682 );
2683 }
2684
2685 #[test]
2686 fn test_format_sessions_no_pr_without_metadata() {
2687 use chrono::Utc;
2688 use pulpo_common::session::SessionStatus;
2689 use uuid::Uuid;
2690
2691 let sessions = vec![Session {
2692 id: Uuid::nil(),
2693 name: "no-pr".into(),
2694 workdir: "/tmp".into(),
2695 command: "claude".into(),
2696 description: None,
2697 status: SessionStatus::Active,
2698 exit_code: None,
2699 backend_session_id: None,
2700 output_snapshot: None,
2701 metadata: None,
2702 ink: None,
2703 intervention_code: None,
2704 intervention_reason: None,
2705 intervention_at: None,
2706 last_output_at: None,
2707 idle_since: None,
2708 idle_threshold_secs: None,
2709 worktree_path: None,
2710 runtime: Runtime::Tmux,
2711 created_at: Utc::now(),
2712 updated_at: Utc::now(),
2713 }];
2714 let output = format_sessions(&sessions);
2715 assert!(
2716 !output.contains("[PR]"),
2717 "should not show PR indicator: {output}"
2718 );
2719 }
2720
2721 #[test]
2722 fn test_format_nodes() {
2723 use pulpo_common::node::NodeInfo;
2724 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2725
2726 let resp = PeersResponse {
2727 local: NodeInfo {
2728 name: "mac-mini".into(),
2729 hostname: "h".into(),
2730 os: "macos".into(),
2731 arch: "arm64".into(),
2732 cpus: 8,
2733 memory_mb: 16384,
2734 gpu: None,
2735 },
2736 peers: vec![PeerInfo {
2737 name: "win-pc".into(),
2738 address: "win-pc:7433".into(),
2739 status: PeerStatus::Online,
2740 node_info: None,
2741 session_count: Some(3),
2742 source: PeerSource::Configured,
2743 }],
2744 };
2745 let output = format_nodes(&resp);
2746 assert!(output.contains("mac-mini"));
2747 assert!(output.contains("(local)"));
2748 assert!(output.contains("win-pc"));
2749 assert!(output.contains('3'));
2750 }
2751
2752 #[test]
2753 fn test_format_nodes_no_session_count() {
2754 use pulpo_common::node::NodeInfo;
2755 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2756
2757 let resp = PeersResponse {
2758 local: NodeInfo {
2759 name: "local".into(),
2760 hostname: "h".into(),
2761 os: "linux".into(),
2762 arch: "x86_64".into(),
2763 cpus: 4,
2764 memory_mb: 8192,
2765 gpu: None,
2766 },
2767 peers: vec![PeerInfo {
2768 name: "peer".into(),
2769 address: "peer:7433".into(),
2770 status: PeerStatus::Offline,
2771 node_info: None,
2772 session_count: None,
2773 source: PeerSource::Configured,
2774 }],
2775 };
2776 let output = format_nodes(&resp);
2777 assert!(output.contains("offline"));
2778 let lines: Vec<&str> = output.lines().collect();
2780 assert!(lines[2].contains('-'));
2781 }
2782
2783 #[tokio::test]
2784 async fn test_execute_resume_connection_refused() {
2785 let cli = Cli {
2786 node: "localhost:1".into(),
2787 token: None,
2788 command: Some(Commands::Resume {
2789 name: "test".into(),
2790 }),
2791 path: None,
2792 };
2793 let result = execute(&cli).await;
2794 let err = result.unwrap_err().to_string();
2795 assert!(err.contains("Could not connect to pulpod"));
2796 }
2797
2798 #[tokio::test]
2799 async fn test_execute_spawn_connection_refused() {
2800 let cli = Cli {
2801 node: "localhost:1".into(),
2802 token: None,
2803 command: Some(Commands::Spawn {
2804 name: Some("test".into()),
2805 workdir: Some("/tmp".into()),
2806 ink: None,
2807 description: None,
2808 detach: true,
2809 idle_threshold: None,
2810 auto: false,
2811 worktree: false,
2812 runtime: None,
2813 secret: vec![],
2814 command: vec!["test".into()],
2815 }),
2816 path: None,
2817 };
2818 let result = execute(&cli).await;
2819 let err = result.unwrap_err().to_string();
2820 assert!(err.contains("Could not connect to pulpod"));
2821 }
2822
2823 #[tokio::test]
2824 async fn test_execute_kill_connection_refused() {
2825 let cli = Cli {
2826 node: "localhost:1".into(),
2827 token: None,
2828 command: Some(Commands::Kill {
2829 name: "test".into(),
2830 }),
2831 path: None,
2832 };
2833 let result = execute(&cli).await;
2834 let err = result.unwrap_err().to_string();
2835 assert!(err.contains("Could not connect to pulpod"));
2836 }
2837
2838 #[tokio::test]
2839 async fn test_execute_delete_connection_refused() {
2840 let cli = Cli {
2841 node: "localhost:1".into(),
2842 token: None,
2843 command: Some(Commands::Delete {
2844 name: "test".into(),
2845 }),
2846 path: None,
2847 };
2848 let result = execute(&cli).await;
2849 let err = result.unwrap_err().to_string();
2850 assert!(err.contains("Could not connect to pulpod"));
2851 }
2852
2853 #[tokio::test]
2854 async fn test_execute_logs_connection_refused() {
2855 let cli = Cli {
2856 node: "localhost:1".into(),
2857 token: None,
2858 command: Some(Commands::Logs {
2859 name: "test".into(),
2860 lines: 50,
2861 follow: false,
2862 }),
2863 path: None,
2864 };
2865 let result = execute(&cli).await;
2866 let err = result.unwrap_err().to_string();
2867 assert!(err.contains("Could not connect to pulpod"));
2868 }
2869
2870 #[tokio::test]
2871 async fn test_friendly_error_connect() {
2872 let err = reqwest::Client::new()
2874 .get("http://127.0.0.1:1")
2875 .send()
2876 .await
2877 .unwrap_err();
2878 let friendly = friendly_error(&err, "test-node:1");
2879 let msg = friendly.to_string();
2880 assert!(
2881 msg.contains("Could not connect"),
2882 "Expected connect message, got: {msg}"
2883 );
2884 }
2885
2886 #[tokio::test]
2887 async fn test_friendly_error_other() {
2888 let err = reqwest::Client::new()
2890 .get("http://[::invalid::url")
2891 .send()
2892 .await
2893 .unwrap_err();
2894 let friendly = friendly_error(&err, "bad-host");
2895 let msg = friendly.to_string();
2896 assert!(
2897 msg.contains("Network error"),
2898 "Expected network error message, got: {msg}"
2899 );
2900 assert!(msg.contains("bad-host"));
2901 }
2902
2903 #[test]
2906 fn test_is_localhost_variants() {
2907 assert!(is_localhost("localhost:7433"));
2908 assert!(is_localhost("127.0.0.1:7433"));
2909 assert!(is_localhost("[::1]:7433"));
2910 assert!(is_localhost("::1"));
2911 assert!(is_localhost("localhost"));
2912 assert!(!is_localhost("mac-mini:7433"));
2913 assert!(!is_localhost("192.168.1.100:7433"));
2914 }
2915
2916 #[test]
2917 fn test_authed_get_with_token() {
2918 let client = reqwest::Client::new();
2919 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2920 .build()
2921 .unwrap();
2922 let auth = req
2923 .headers()
2924 .get("authorization")
2925 .unwrap()
2926 .to_str()
2927 .unwrap();
2928 assert_eq!(auth, "Bearer tok");
2929 }
2930
2931 #[test]
2932 fn test_authed_get_without_token() {
2933 let client = reqwest::Client::new();
2934 let req = authed_get(&client, "http://h:1/api".into(), None)
2935 .build()
2936 .unwrap();
2937 assert!(req.headers().get("authorization").is_none());
2938 }
2939
2940 #[test]
2941 fn test_authed_post_with_token() {
2942 let client = reqwest::Client::new();
2943 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2944 .build()
2945 .unwrap();
2946 let auth = req
2947 .headers()
2948 .get("authorization")
2949 .unwrap()
2950 .to_str()
2951 .unwrap();
2952 assert_eq!(auth, "Bearer secret");
2953 }
2954
2955 #[test]
2956 fn test_authed_post_without_token() {
2957 let client = reqwest::Client::new();
2958 let req = authed_post(&client, "http://h:1/api".into(), None)
2959 .build()
2960 .unwrap();
2961 assert!(req.headers().get("authorization").is_none());
2962 }
2963
2964 #[test]
2965 fn test_authed_delete_with_token() {
2966 let client = reqwest::Client::new();
2967 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2968 .build()
2969 .unwrap();
2970 let auth = req
2971 .headers()
2972 .get("authorization")
2973 .unwrap()
2974 .to_str()
2975 .unwrap();
2976 assert_eq!(auth, "Bearer del-tok");
2977 }
2978
2979 #[test]
2980 fn test_authed_delete_without_token() {
2981 let client = reqwest::Client::new();
2982 let req = authed_delete(&client, "http://h:1/api".into(), None)
2983 .build()
2984 .unwrap();
2985 assert!(req.headers().get("authorization").is_none());
2986 }
2987
2988 #[tokio::test]
2989 async fn test_resolve_token_explicit() {
2990 let client = reqwest::Client::new();
2991 let token =
2992 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2993 assert_eq!(token, Some("my-tok".into()));
2994 }
2995
2996 #[tokio::test]
2997 async fn test_resolve_token_remote_no_explicit() {
2998 let client = reqwest::Client::new();
2999 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
3000 assert_eq!(token, None);
3001 }
3002
3003 #[tokio::test]
3004 async fn test_resolve_token_localhost_auto_discover() {
3005 use axum::{Json, Router, routing::get};
3006
3007 let app = Router::new().route(
3008 "/api/v1/auth/token",
3009 get(|| async {
3010 Json(AuthTokenResponse {
3011 token: "discovered".into(),
3012 })
3013 }),
3014 );
3015 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3016 let addr = listener.local_addr().unwrap();
3017 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3018
3019 let node = format!("localhost:{}", addr.port());
3020 let base = base_url(&node);
3021 let client = reqwest::Client::new();
3022 let token = resolve_token(&client, &base, &node, None).await;
3023 assert_eq!(token, Some("discovered".into()));
3024 }
3025
3026 #[tokio::test]
3027 async fn test_discover_token_empty_returns_none() {
3028 use axum::{Json, Router, routing::get};
3029
3030 let app = Router::new().route(
3031 "/api/v1/auth/token",
3032 get(|| async {
3033 Json(AuthTokenResponse {
3034 token: String::new(),
3035 })
3036 }),
3037 );
3038 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3039 let addr = listener.local_addr().unwrap();
3040 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3041
3042 let base = format!("http://127.0.0.1:{}", addr.port());
3043 let client = reqwest::Client::new();
3044 assert_eq!(discover_token(&client, &base).await, None);
3045 }
3046
3047 #[tokio::test]
3048 async fn test_discover_token_unreachable_returns_none() {
3049 let client = reqwest::Client::new();
3050 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3051 }
3052
3053 #[test]
3054 fn test_cli_parse_with_token() {
3055 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3056 assert_eq!(cli.token, Some("my-secret".into()));
3057 }
3058
3059 #[test]
3060 fn test_cli_parse_without_token() {
3061 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3062 assert_eq!(cli.token, None);
3063 }
3064
3065 #[tokio::test]
3066 async fn test_execute_with_explicit_token_sends_header() {
3067 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3068
3069 let app = Router::new().route(
3070 "/api/v1/sessions",
3071 get(|req: Request| async move {
3072 let auth = req
3073 .headers()
3074 .get("authorization")
3075 .and_then(|v| v.to_str().ok())
3076 .unwrap_or("");
3077 assert_eq!(auth, "Bearer test-token");
3078 (StatusCode::OK, "[]".to_owned())
3079 }),
3080 );
3081 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3082 let addr = listener.local_addr().unwrap();
3083 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3084 let node = format!("127.0.0.1:{}", addr.port());
3085
3086 let cli = Cli {
3087 node,
3088 token: Some("test-token".into()),
3089 command: Some(Commands::List { all: false }),
3090 path: None,
3091 };
3092 let result = execute(&cli).await.unwrap();
3093 assert_eq!(result, "No sessions.");
3094 }
3095
3096 #[test]
3099 fn test_cli_parse_interventions() {
3100 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3101 assert!(matches!(
3102 &cli.command,
3103 Some(Commands::Interventions { name }) if name == "my-session"
3104 ));
3105 }
3106
3107 #[test]
3108 fn test_format_interventions_empty() {
3109 assert_eq!(format_interventions(&[]), "No intervention events.");
3110 }
3111
3112 #[test]
3113 fn test_format_interventions_with_data() {
3114 let events = vec![
3115 InterventionEventResponse {
3116 id: 1,
3117 session_id: "sess-1".into(),
3118 code: None,
3119 reason: "Memory exceeded threshold".into(),
3120 created_at: "2026-01-01T00:00:00Z".into(),
3121 },
3122 InterventionEventResponse {
3123 id: 2,
3124 session_id: "sess-1".into(),
3125 code: None,
3126 reason: "Idle for 10 minutes".into(),
3127 created_at: "2026-01-02T00:00:00Z".into(),
3128 },
3129 ];
3130 let output = format_interventions(&events);
3131 assert!(output.contains("ID"));
3132 assert!(output.contains("TIMESTAMP"));
3133 assert!(output.contains("REASON"));
3134 assert!(output.contains("Memory exceeded threshold"));
3135 assert!(output.contains("Idle for 10 minutes"));
3136 assert!(output.contains("2026-01-01T00:00:00Z"));
3137 }
3138
3139 #[tokio::test]
3140 async fn test_execute_interventions_empty() {
3141 let node = start_test_server().await;
3142 let cli = Cli {
3143 node,
3144 token: None,
3145 command: Some(Commands::Interventions {
3146 name: "my-session".into(),
3147 }),
3148 path: None,
3149 };
3150 let result = execute(&cli).await.unwrap();
3151 assert_eq!(result, "No intervention events.");
3152 }
3153
3154 #[tokio::test]
3155 async fn test_execute_interventions_with_data() {
3156 use axum::{Router, routing::get};
3157
3158 let app = Router::new().route(
3159 "/api/v1/sessions/{id}/interventions",
3160 get(|| async {
3161 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3162 .to_owned()
3163 }),
3164 );
3165 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3166 let addr = listener.local_addr().unwrap();
3167 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3168 let node = format!("127.0.0.1:{}", addr.port());
3169
3170 let cli = Cli {
3171 node,
3172 token: None,
3173 command: Some(Commands::Interventions {
3174 name: "test".into(),
3175 }),
3176 path: None,
3177 };
3178 let result = execute(&cli).await.unwrap();
3179 assert!(result.contains("OOM"));
3180 assert!(result.contains("2026-01-01T00:00:00Z"));
3181 }
3182
3183 #[tokio::test]
3184 async fn test_execute_interventions_connection_refused() {
3185 let cli = Cli {
3186 node: "localhost:1".into(),
3187 token: None,
3188 command: Some(Commands::Interventions {
3189 name: "test".into(),
3190 }),
3191 path: None,
3192 };
3193 let result = execute(&cli).await;
3194 let err = result.unwrap_err().to_string();
3195 assert!(err.contains("Could not connect to pulpod"));
3196 }
3197
3198 #[test]
3201 fn test_build_attach_command_tmux() {
3202 let cmd = build_attach_command("my-session");
3203 assert_eq!(cmd.get_program(), "tmux");
3204 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3205 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3206 }
3207
3208 #[test]
3209 fn test_build_attach_command_docker() {
3210 let cmd = build_attach_command("docker:pulpo-my-task");
3211 assert_eq!(cmd.get_program(), "docker");
3212 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3213 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3214 }
3215
3216 #[test]
3217 fn test_cli_parse_attach() {
3218 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3219 assert!(matches!(
3220 &cli.command,
3221 Some(Commands::Attach { name }) if name == "my-session"
3222 ));
3223 }
3224
3225 #[test]
3226 fn test_cli_parse_attach_alias() {
3227 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3228 assert!(matches!(
3229 &cli.command,
3230 Some(Commands::Attach { name }) if name == "my-session"
3231 ));
3232 }
3233
3234 #[tokio::test]
3235 async fn test_execute_attach_success() {
3236 let node = start_test_server().await;
3237 let cli = Cli {
3238 node,
3239 token: None,
3240 command: Some(Commands::Attach {
3241 name: "test-session".into(),
3242 }),
3243 path: None,
3244 };
3245 let result = execute(&cli).await.unwrap();
3246 assert!(result.contains("Detached from session test-session"));
3247 }
3248
3249 #[tokio::test]
3250 async fn test_execute_attach_with_backend_session_id() {
3251 use axum::{Router, routing::get};
3252 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3253 let app = Router::new().route(
3254 "/api/v1/sessions/{id}",
3255 get(move || async move { session_json.to_owned() }),
3256 );
3257 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3258 let addr = listener.local_addr().unwrap();
3259 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3260
3261 let cli = Cli {
3262 node: format!("127.0.0.1:{}", addr.port()),
3263 token: None,
3264 command: Some(Commands::Attach {
3265 name: "my-session".into(),
3266 }),
3267 path: None,
3268 };
3269 let result = execute(&cli).await.unwrap();
3270 assert!(result.contains("Detached from session my-session"));
3271 }
3272
3273 #[tokio::test]
3274 async fn test_execute_attach_connection_refused() {
3275 let cli = Cli {
3276 node: "localhost:1".into(),
3277 token: None,
3278 command: Some(Commands::Attach {
3279 name: "test-session".into(),
3280 }),
3281 path: None,
3282 };
3283 let result = execute(&cli).await;
3284 let err = result.unwrap_err().to_string();
3285 assert!(err.contains("Could not connect to pulpod"));
3286 }
3287
3288 #[tokio::test]
3289 async fn test_execute_attach_error_response() {
3290 use axum::{Router, http::StatusCode, routing::get};
3291 let app = Router::new().route(
3292 "/api/v1/sessions/{id}",
3293 get(|| async {
3294 (
3295 StatusCode::NOT_FOUND,
3296 r#"{"error":"session not found"}"#.to_owned(),
3297 )
3298 }),
3299 );
3300 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3301 let addr = listener.local_addr().unwrap();
3302 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3303
3304 let cli = Cli {
3305 node: format!("127.0.0.1:{}", addr.port()),
3306 token: None,
3307 command: Some(Commands::Attach {
3308 name: "nonexistent".into(),
3309 }),
3310 path: None,
3311 };
3312 let result = execute(&cli).await;
3313 let err = result.unwrap_err().to_string();
3314 assert!(err.contains("session not found"));
3315 }
3316
3317 #[tokio::test]
3318 async fn test_execute_attach_stale_session() {
3319 use axum::{Router, routing::get};
3320 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3321 let app = Router::new().route(
3322 "/api/v1/sessions/{id}",
3323 get(move || async move { session_json.to_owned() }),
3324 );
3325 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3326 let addr = listener.local_addr().unwrap();
3327 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3328
3329 let cli = Cli {
3330 node: format!("127.0.0.1:{}", addr.port()),
3331 token: None,
3332 command: Some(Commands::Attach {
3333 name: "stale-sess".into(),
3334 }),
3335 path: None,
3336 };
3337 let result = execute(&cli).await;
3338 let err = result.unwrap_err().to_string();
3339 assert!(err.contains("lost"));
3340 assert!(err.contains("pulpo resume"));
3341 }
3342
3343 #[tokio::test]
3344 async fn test_execute_attach_dead_session() {
3345 use axum::{Router, routing::get};
3346 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3347 let app = Router::new().route(
3348 "/api/v1/sessions/{id}",
3349 get(move || async move { session_json.to_owned() }),
3350 );
3351 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3352 let addr = listener.local_addr().unwrap();
3353 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3354
3355 let cli = Cli {
3356 node: format!("127.0.0.1:{}", addr.port()),
3357 token: None,
3358 command: Some(Commands::Attach {
3359 name: "dead-sess".into(),
3360 }),
3361 path: None,
3362 };
3363 let result = execute(&cli).await;
3364 let err = result.unwrap_err().to_string();
3365 assert!(err.contains("killed"));
3366 assert!(err.contains("cannot attach"));
3367 }
3368
3369 #[test]
3372 fn test_cli_parse_alias_spawn() {
3373 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3374 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3375 }
3376
3377 #[test]
3378 fn test_cli_parse_alias_list() {
3379 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3380 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3381 }
3382
3383 #[test]
3384 fn test_cli_parse_list_all() {
3385 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3386 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3387
3388 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3389 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3390 }
3391
3392 #[test]
3393 fn test_cli_parse_alias_logs() {
3394 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3395 assert!(matches!(
3396 &cli.command,
3397 Some(Commands::Logs { name, .. }) if name == "my-session"
3398 ));
3399 }
3400
3401 #[test]
3402 fn test_cli_parse_alias_kill() {
3403 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3404 assert!(matches!(
3405 &cli.command,
3406 Some(Commands::Kill { name }) if name == "my-session"
3407 ));
3408 }
3409
3410 #[test]
3411 fn test_cli_parse_alias_delete() {
3412 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3413 assert!(matches!(
3414 &cli.command,
3415 Some(Commands::Delete { name }) if name == "my-session"
3416 ));
3417 }
3418
3419 #[test]
3420 fn test_cli_parse_alias_resume() {
3421 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3422 assert!(matches!(
3423 &cli.command,
3424 Some(Commands::Resume { name }) if name == "my-session"
3425 ));
3426 }
3427
3428 #[test]
3429 fn test_cli_parse_alias_nodes() {
3430 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3431 assert!(matches!(&cli.command, Some(Commands::Nodes)));
3432 }
3433
3434 #[test]
3435 fn test_cli_parse_alias_interventions() {
3436 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3437 assert!(matches!(
3438 &cli.command,
3439 Some(Commands::Interventions { name }) if name == "my-session"
3440 ));
3441 }
3442
3443 #[test]
3444 fn test_api_error_json() {
3445 let err = api_error("{\"error\":\"session not found: foo\"}");
3446 assert_eq!(err.to_string(), "session not found: foo");
3447 }
3448
3449 #[test]
3450 fn test_api_error_plain_text() {
3451 let err = api_error("plain text error");
3452 assert_eq!(err.to_string(), "plain text error");
3453 }
3454
3455 #[test]
3458 fn test_diff_output_empty_prev() {
3459 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3460 }
3461
3462 #[test]
3463 fn test_diff_output_identical() {
3464 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3465 }
3466
3467 #[test]
3468 fn test_diff_output_new_lines_appended() {
3469 let prev = "line1\nline2";
3470 let new = "line1\nline2\nline3\nline4";
3471 assert_eq!(diff_output(prev, new), "line3\nline4");
3472 }
3473
3474 #[test]
3475 fn test_diff_output_scrolled_window() {
3476 let prev = "line1\nline2\nline3";
3478 let new = "line2\nline3\nline4";
3479 assert_eq!(diff_output(prev, new), "line4");
3480 }
3481
3482 #[test]
3483 fn test_diff_output_completely_different() {
3484 let prev = "aaa\nbbb";
3485 let new = "xxx\nyyy";
3486 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3487 }
3488
3489 #[test]
3490 fn test_diff_output_last_line_matches_but_overlap_fails() {
3491 let prev = "aaa\ncommon";
3493 let new = "zzz\ncommon\nnew_line";
3494 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3498 }
3499
3500 #[test]
3501 fn test_diff_output_new_empty() {
3502 assert_eq!(diff_output("line1", ""), "");
3503 }
3504
3505 async fn start_follow_test_server() -> String {
3510 use axum::{Router, extract::Path, extract::Query, routing::get};
3511 use std::sync::Arc;
3512 use std::sync::atomic::{AtomicUsize, Ordering};
3513
3514 let call_count = Arc::new(AtomicUsize::new(0));
3515 let output_count = call_count.clone();
3516
3517 let app = Router::new()
3518 .route(
3519 "/api/v1/sessions/{id}/output",
3520 get(
3521 move |_path: Path<String>,
3522 _query: Query<std::collections::HashMap<String, String>>| {
3523 let count = output_count.clone();
3524 async move {
3525 let n = count.fetch_add(1, Ordering::SeqCst);
3526 let output = match n {
3527 0 => "line1\nline2".to_owned(),
3528 1 => "line1\nline2\nline3".to_owned(),
3529 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3530 };
3531 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3532 }
3533 },
3534 ),
3535 )
3536 .route(
3537 "/api/v1/sessions/{id}",
3538 get(|_path: Path<String>| async {
3539 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3540 }),
3541 );
3542
3543 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3544 let addr = listener.local_addr().unwrap();
3545 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3546 format!("http://127.0.0.1:{}", addr.port())
3547 }
3548
3549 #[tokio::test]
3550 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3551 let base = start_follow_test_server().await;
3552 let client = reqwest::Client::new();
3553 let mut buf = Vec::new();
3554
3555 follow_logs(&client, &base, "test", 100, None, &mut buf)
3556 .await
3557 .unwrap();
3558
3559 let output = String::from_utf8(buf).unwrap();
3560 assert!(output.contains("line1"));
3562 assert!(output.contains("line2"));
3563 assert!(output.contains("line3"));
3564 assert!(output.contains("line4"));
3565 assert!(output.contains("[pulpo] Agent exited"));
3566 }
3567
3568 #[tokio::test]
3569 async fn test_execute_logs_follow_success() {
3570 let base = start_follow_test_server().await;
3571 let node = base.strip_prefix("http://").unwrap().to_owned();
3573
3574 let cli = Cli {
3575 node,
3576 token: None,
3577 command: Some(Commands::Logs {
3578 name: "test".into(),
3579 lines: 100,
3580 follow: true,
3581 }),
3582 path: None,
3583 };
3584 let result = execute(&cli).await.unwrap();
3586 assert_eq!(result, "");
3587 }
3588
3589 #[tokio::test]
3590 async fn test_execute_logs_follow_connection_refused() {
3591 let cli = Cli {
3592 node: "localhost:1".into(),
3593 token: None,
3594 command: Some(Commands::Logs {
3595 name: "test".into(),
3596 lines: 50,
3597 follow: true,
3598 }),
3599 path: None,
3600 };
3601 let result = execute(&cli).await;
3602 let err = result.unwrap_err().to_string();
3603 assert!(
3604 err.contains("Could not connect to pulpod"),
3605 "Expected friendly error, got: {err}"
3606 );
3607 }
3608
3609 #[tokio::test]
3610 async fn test_follow_logs_exits_on_dead() {
3611 use axum::{Router, extract::Path, extract::Query, routing::get};
3612
3613 let app = Router::new()
3614 .route(
3615 "/api/v1/sessions/{id}/output",
3616 get(
3617 |_path: Path<String>,
3618 _query: Query<std::collections::HashMap<String, String>>| async {
3619 r#"{"output":"some output"}"#.to_owned()
3620 },
3621 ),
3622 )
3623 .route(
3624 "/api/v1/sessions/{id}",
3625 get(|_path: Path<String>| async {
3626 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3627 }),
3628 );
3629
3630 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3631 let addr = listener.local_addr().unwrap();
3632 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3633 let base = format!("http://127.0.0.1:{}", addr.port());
3634
3635 let client = reqwest::Client::new();
3636 let mut buf = Vec::new();
3637 follow_logs(&client, &base, "test", 100, None, &mut buf)
3638 .await
3639 .unwrap();
3640
3641 let output = String::from_utf8(buf).unwrap();
3642 assert!(output.contains("some output"));
3643 }
3644
3645 #[tokio::test]
3646 async fn test_follow_logs_exits_on_stale() {
3647 use axum::{Router, extract::Path, extract::Query, routing::get};
3648
3649 let app = Router::new()
3650 .route(
3651 "/api/v1/sessions/{id}/output",
3652 get(
3653 |_path: Path<String>,
3654 _query: Query<std::collections::HashMap<String, String>>| async {
3655 r#"{"output":"stale output"}"#.to_owned()
3656 },
3657 ),
3658 )
3659 .route(
3660 "/api/v1/sessions/{id}",
3661 get(|_path: Path<String>| async {
3662 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3663 }),
3664 );
3665
3666 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3667 let addr = listener.local_addr().unwrap();
3668 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3669 let base = format!("http://127.0.0.1:{}", addr.port());
3670
3671 let client = reqwest::Client::new();
3672 let mut buf = Vec::new();
3673 follow_logs(&client, &base, "test", 100, None, &mut buf)
3674 .await
3675 .unwrap();
3676
3677 let output = String::from_utf8(buf).unwrap();
3678 assert!(output.contains("stale output"));
3679 }
3680
3681 #[tokio::test]
3682 async fn test_execute_logs_follow_non_reqwest_error() {
3683 use axum::{Router, extract::Path, extract::Query, routing::get};
3684
3685 let app = Router::new()
3687 .route(
3688 "/api/v1/sessions/{id}/output",
3689 get(
3690 |_path: Path<String>,
3691 _query: Query<std::collections::HashMap<String, String>>| async {
3692 r#"{"output":"initial"}"#.to_owned()
3693 },
3694 ),
3695 )
3696 .route(
3697 "/api/v1/sessions/{id}",
3698 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3699 );
3700
3701 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3702 let addr = listener.local_addr().unwrap();
3703 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3704 let node = format!("127.0.0.1:{}", addr.port());
3705
3706 let cli = Cli {
3707 node,
3708 token: None,
3709 command: Some(Commands::Logs {
3710 name: "test".into(),
3711 lines: 100,
3712 follow: true,
3713 }),
3714 path: None,
3715 };
3716 let err = execute(&cli).await.unwrap_err();
3717 let msg = err.to_string();
3719 assert!(
3720 msg.contains("expected ident"),
3721 "Expected serde parse error, got: {msg}"
3722 );
3723 }
3724
3725 #[tokio::test]
3726 async fn test_fetch_session_status_connection_error() {
3727 let client = reqwest::Client::new();
3728 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3729 assert!(result.is_err());
3730 }
3731
3732 #[test]
3735 fn test_format_schedules_empty() {
3736 assert_eq!(format_schedules(&[]), "No schedules.");
3737 }
3738
3739 #[test]
3740 fn test_format_schedules_with_entries() {
3741 let schedules = vec![serde_json::json!({
3742 "name": "nightly",
3743 "cron": "0 3 * * *",
3744 "enabled": true,
3745 "last_run_at": null,
3746 "target_node": null
3747 })];
3748 let output = format_schedules(&schedules);
3749 assert!(output.contains("nightly"));
3750 assert!(output.contains("0 3 * * *"));
3751 assert!(output.contains("local"));
3752 assert!(output.contains("yes"));
3753 assert!(output.contains('-'));
3754 }
3755
3756 #[test]
3757 fn test_format_schedules_disabled_entry() {
3758 let schedules = vec![serde_json::json!({
3759 "name": "weekly",
3760 "cron": "0 0 * * 0",
3761 "enabled": false,
3762 "last_run_at": "2026-03-18T03:00:00Z",
3763 "target_node": "gpu-box"
3764 })];
3765 let output = format_schedules(&schedules);
3766 assert!(output.contains("weekly"));
3767 assert!(output.contains("no"));
3768 assert!(output.contains("gpu-box"));
3769 assert!(output.contains("2026-03-18T03:00"));
3770 }
3771
3772 #[test]
3773 fn test_format_schedules_header() {
3774 let schedules = vec![serde_json::json!({
3775 "name": "test",
3776 "cron": "* * * * *",
3777 "enabled": true,
3778 "last_run_at": null,
3779 "target_node": null
3780 })];
3781 let output = format_schedules(&schedules);
3782 assert!(output.contains("NAME"));
3783 assert!(output.contains("CRON"));
3784 assert!(output.contains("ENABLED"));
3785 assert!(output.contains("LAST RUN"));
3786 assert!(output.contains("NODE"));
3787 }
3788
3789 #[test]
3792 fn test_cli_parse_schedule_add() {
3793 let cli = Cli::try_parse_from([
3794 "pulpo",
3795 "schedule",
3796 "add",
3797 "nightly",
3798 "0 3 * * *",
3799 "--workdir",
3800 "/repo",
3801 "--",
3802 "claude",
3803 "-p",
3804 "review",
3805 ])
3806 .unwrap();
3807 assert!(matches!(
3808 &cli.command,
3809 Some(Commands::Schedule {
3810 action: ScheduleAction::Add { name, cron, .. }
3811 }) if name == "nightly" && cron == "0 3 * * *"
3812 ));
3813 }
3814
3815 #[test]
3816 fn test_cli_parse_schedule_add_with_node() {
3817 let cli = Cli::try_parse_from([
3818 "pulpo",
3819 "schedule",
3820 "add",
3821 "nightly",
3822 "0 3 * * *",
3823 "--workdir",
3824 "/repo",
3825 "--node",
3826 "gpu-box",
3827 "--",
3828 "claude",
3829 ])
3830 .unwrap();
3831 assert!(matches!(
3832 &cli.command,
3833 Some(Commands::Schedule {
3834 action: ScheduleAction::Add { node, .. }
3835 }) if node.as_deref() == Some("gpu-box")
3836 ));
3837 }
3838
3839 #[test]
3840 fn test_cli_parse_schedule_add_install_alias() {
3841 let cli =
3842 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3843 assert!(matches!(
3844 &cli.command,
3845 Some(Commands::Schedule {
3846 action: ScheduleAction::Add { name, .. }
3847 }) if name == "nightly"
3848 ));
3849 }
3850
3851 #[test]
3852 fn test_cli_parse_schedule_list() {
3853 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3854 assert!(matches!(
3855 &cli.command,
3856 Some(Commands::Schedule {
3857 action: ScheduleAction::List
3858 })
3859 ));
3860 }
3861
3862 #[test]
3863 fn test_cli_parse_schedule_remove() {
3864 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3865 assert!(matches!(
3866 &cli.command,
3867 Some(Commands::Schedule {
3868 action: ScheduleAction::Remove { name }
3869 }) if name == "nightly"
3870 ));
3871 }
3872
3873 #[test]
3874 fn test_cli_parse_schedule_pause() {
3875 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3876 assert!(matches!(
3877 &cli.command,
3878 Some(Commands::Schedule {
3879 action: ScheduleAction::Pause { name }
3880 }) if name == "nightly"
3881 ));
3882 }
3883
3884 #[test]
3885 fn test_cli_parse_schedule_resume() {
3886 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3887 assert!(matches!(
3888 &cli.command,
3889 Some(Commands::Schedule {
3890 action: ScheduleAction::Resume { name }
3891 }) if name == "nightly"
3892 ));
3893 }
3894
3895 #[test]
3896 fn test_cli_parse_schedule_alias() {
3897 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3898 assert!(matches!(
3899 &cli.command,
3900 Some(Commands::Schedule {
3901 action: ScheduleAction::List
3902 })
3903 ));
3904 }
3905
3906 #[test]
3907 fn test_cli_parse_schedule_list_alias() {
3908 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3909 assert!(matches!(
3910 &cli.command,
3911 Some(Commands::Schedule {
3912 action: ScheduleAction::List
3913 })
3914 ));
3915 }
3916
3917 #[test]
3918 fn test_cli_parse_schedule_remove_alias() {
3919 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3920 assert!(matches!(
3921 &cli.command,
3922 Some(Commands::Schedule {
3923 action: ScheduleAction::Remove { name }
3924 }) if name == "nightly"
3925 ));
3926 }
3927
3928 #[tokio::test]
3929 async fn test_execute_schedule_list_via_execute() {
3930 let node = start_test_server().await;
3931 let cli = Cli {
3932 node,
3933 token: None,
3934 command: Some(Commands::Schedule {
3935 action: ScheduleAction::List,
3936 }),
3937 path: None,
3938 };
3939 let result = execute(&cli).await.unwrap();
3940 #[cfg(coverage)]
3942 assert!(result.is_empty());
3943 #[cfg(not(coverage))]
3944 assert_eq!(result, "No schedules.");
3945 }
3946
3947 #[test]
3948 fn test_schedule_action_debug() {
3949 let action = ScheduleAction::List;
3950 assert_eq!(format!("{action:?}"), "List");
3951 }
3952
3953 #[test]
3954 fn test_cli_parse_send_alias() {
3955 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3956 assert!(matches!(
3957 &cli.command,
3958 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3959 ));
3960 }
3961
3962 #[test]
3963 fn test_cli_parse_spawn_no_name() {
3964 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3965 assert!(matches!(
3966 &cli.command,
3967 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3968 ));
3969 }
3970
3971 #[test]
3972 fn test_cli_parse_spawn_optional_name_with_command() {
3973 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3974 assert!(matches!(
3975 &cli.command,
3976 Some(Commands::Spawn { name, command, .. })
3977 if name.is_none() && command == &["echo", "hello"]
3978 ));
3979 }
3980
3981 #[test]
3982 fn test_cli_parse_path_shortcut() {
3983 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3984 assert!(cli.command.is_none());
3985 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3986 }
3987
3988 #[test]
3989 fn test_cli_parse_no_args() {
3990 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3991 assert!(cli.command.is_none());
3992 assert!(cli.path.is_none());
3993 }
3994
3995 #[test]
3996 fn test_derive_session_name_simple() {
3997 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3998 }
3999
4000 #[test]
4001 fn test_derive_session_name_with_special_chars() {
4002 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
4003 }
4004
4005 #[test]
4006 fn test_derive_session_name_root() {
4007 assert_eq!(derive_session_name("/"), "session");
4008 }
4009
4010 #[test]
4011 fn test_derive_session_name_dots() {
4012 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
4013 }
4014
4015 #[test]
4016 fn test_resolve_path_absolute() {
4017 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4018 }
4019
4020 #[test]
4021 fn test_resolve_path_relative() {
4022 let resolved = resolve_path("my-repo");
4023 assert!(resolved.ends_with("my-repo"));
4024 assert!(resolved.starts_with('/'));
4025 }
4026
4027 #[tokio::test]
4028 async fn test_execute_no_args_shows_help() {
4029 let node = start_test_server().await;
4030 let cli = Cli {
4031 node,
4032 token: None,
4033 path: None,
4034 command: None,
4035 };
4036 let result = execute(&cli).await.unwrap();
4037 assert!(
4038 result.is_empty(),
4039 "no-args should return empty string after printing help"
4040 );
4041 }
4042
4043 #[tokio::test]
4044 async fn test_execute_path_shortcut() {
4045 let node = start_test_server().await;
4046 let cli = Cli {
4047 node,
4048 token: None,
4049 path: Some("/tmp".into()),
4050 command: None,
4051 };
4052 let result = execute(&cli).await.unwrap();
4053 assert!(result.contains("Detached from session"));
4054 }
4055
4056 #[tokio::test]
4057 async fn test_deduplicate_session_name_no_conflict() {
4058 let base = "http://127.0.0.1:1";
4060 let client = reqwest::Client::new();
4061 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4062 assert_eq!(name, "fresh");
4063 }
4064
4065 #[tokio::test]
4066 async fn test_deduplicate_session_name_with_conflict() {
4067 use axum::{Router, routing::get};
4068 use std::sync::atomic::{AtomicU32, Ordering};
4069
4070 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4071 let counter = call_count.clone();
4072 let app = Router::new()
4073 .route(
4074 "/api/v1/sessions/{id}",
4075 get(move || {
4076 let c = counter.clone();
4077 async move {
4078 let n = c.fetch_add(1, Ordering::SeqCst);
4079 if n == 0 {
4080 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4082 } else {
4083 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4085 }
4086 }
4087 }),
4088 )
4089 .route(
4090 "/api/v1/peers",
4091 get(|| async {
4092 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4093 }),
4094 );
4095 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4096 let addr = listener.local_addr().unwrap();
4097 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4098 let base = format!("http://127.0.0.1:{}", addr.port());
4099 let client = reqwest::Client::new();
4100 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4101 assert_eq!(name, "repo-2");
4102 }
4103
4104 #[test]
4107 fn test_node_needs_resolution() {
4108 assert!(!node_needs_resolution("localhost:7433"));
4109 assert!(!node_needs_resolution("mac-mini:7433"));
4110 assert!(!node_needs_resolution("10.0.0.1:7433"));
4111 assert!(!node_needs_resolution("[::1]:7433"));
4112 assert!(node_needs_resolution("mac-mini"));
4113 assert!(node_needs_resolution("linux-server"));
4114 assert!(node_needs_resolution("localhost"));
4115 }
4116
4117 #[tokio::test]
4118 async fn test_resolve_node_with_port() {
4119 let client = reqwest::Client::new();
4120 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4121 assert_eq!(addr, "mac-mini:7433");
4122 assert!(token.is_none());
4123 }
4124
4125 #[tokio::test]
4126 async fn test_resolve_node_fallback_appends_port() {
4127 let client = reqwest::Client::new();
4130 let (addr, token) = resolve_node(&client, "unknown-host").await;
4131 assert_eq!(addr, "unknown-host:7433");
4132 assert!(token.is_none());
4133 }
4134
4135 #[cfg(not(coverage))]
4136 #[tokio::test]
4137 async fn test_resolve_node_finds_peer() {
4138 use axum::{Router, routing::get};
4139
4140 let app = Router::new()
4141 .route(
4142 "/api/v1/peers",
4143 get(|| async {
4144 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4145 }),
4146 )
4147 .route(
4148 "/api/v1/config",
4149 get(|| async {
4150 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4151 }),
4152 );
4153
4154 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4156 return;
4157 };
4158 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4159
4160 let client = reqwest::Client::new();
4161 let (addr, token) = resolve_node(&client, "mac-mini").await;
4162 assert_eq!(addr, "10.0.0.5:7433");
4163 assert_eq!(token, Some("peer-secret".into()));
4164 }
4165
4166 #[cfg(not(coverage))]
4167 #[tokio::test]
4168 async fn test_resolve_node_peer_no_token() {
4169 use axum::{Router, routing::get};
4170
4171 let app = Router::new()
4172 .route(
4173 "/api/v1/peers",
4174 get(|| async {
4175 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4176 }),
4177 )
4178 .route(
4179 "/api/v1/config",
4180 get(|| async {
4181 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4182 }),
4183 );
4184
4185 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4186 return; };
4188 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4189
4190 let client = reqwest::Client::new();
4191 let (addr, token) = resolve_node(&client, "test-peer").await;
4192 assert_eq!(addr, "10.0.0.9:7433");
4193 assert!(token.is_none()); }
4195
4196 #[tokio::test]
4197 async fn test_execute_with_peer_name_resolution() {
4198 let cli = Cli {
4202 node: "nonexistent-peer".into(),
4203 token: None,
4204 command: Some(Commands::List { all: false }),
4205 path: None,
4206 };
4207 let result = execute(&cli).await;
4208 assert!(result.is_err());
4210 }
4211
4212 #[test]
4215 fn test_cli_parse_spawn_auto() {
4216 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4217 assert!(matches!(
4218 &cli.command,
4219 Some(Commands::Spawn { auto, .. }) if *auto
4220 ));
4221 }
4222
4223 #[test]
4224 fn test_cli_parse_spawn_auto_default() {
4225 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4226 assert!(matches!(
4227 &cli.command,
4228 Some(Commands::Spawn { auto, .. }) if !auto
4229 ));
4230 }
4231
4232 #[tokio::test]
4233 async fn test_select_best_node_coverage_stub() {
4234 let client = reqwest::Client::new();
4236 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4239 }
4240
4241 #[cfg(not(coverage))]
4242 #[tokio::test]
4243 async fn test_select_best_node_picks_least_loaded() {
4244 use axum::{Router, routing::get};
4245
4246 let app = Router::new().route(
4247 "/api/v1/peers",
4248 get(|| async {
4249 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4250 }),
4251 );
4252 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4253 let addr = listener.local_addr().unwrap();
4254 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4255 let base = format!("http://127.0.0.1:{}", addr.port());
4256
4257 let client = reqwest::Client::new();
4258 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4259 assert_eq!(name, "idle");
4263 assert_eq!(addr, "idle:7433");
4264 }
4265
4266 #[cfg(not(coverage))]
4267 #[tokio::test]
4268 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4269 use axum::{Router, routing::get};
4270
4271 let app = Router::new().route(
4272 "/api/v1/peers",
4273 get(|| async {
4274 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4275 }),
4276 );
4277 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4278 let addr = listener.local_addr().unwrap();
4279 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4280 let base = format!("http://127.0.0.1:{}", addr.port());
4281
4282 let client = reqwest::Client::new();
4283 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4284 assert_eq!(name, "my-mac");
4285 assert_eq!(addr, "localhost:7433");
4286 }
4287
4288 #[cfg(not(coverage))]
4289 #[tokio::test]
4290 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4291 use axum::{Router, routing::get};
4292
4293 let app = Router::new().route(
4294 "/api/v1/peers",
4295 get(|| async {
4296 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4297 }),
4298 );
4299 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4300 let addr = listener.local_addr().unwrap();
4301 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4302 let base = format!("http://127.0.0.1:{}", addr.port());
4303
4304 let client = reqwest::Client::new();
4305 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4306 assert_eq!(name, "solo");
4307 assert_eq!(addr, "localhost:7433");
4308 }
4309
4310 #[cfg(not(coverage))]
4311 #[tokio::test]
4312 async fn test_execute_spawn_auto_selects_node() {
4313 use axum::{
4314 Router,
4315 http::StatusCode,
4316 routing::{get, post},
4317 };
4318
4319 let create_json = test_create_response_json();
4320
4321 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4323 let addr = listener.local_addr().unwrap();
4324 let node = format!("127.0.0.1:{}", addr.port());
4325 let peer_addr = node.clone();
4326
4327 let app = Router::new()
4328 .route(
4329 "/api/v1/peers",
4330 get(move || {
4331 let peer_addr = peer_addr.clone();
4332 async move {
4333 format!(
4334 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4335 )
4336 }
4337 }),
4338 )
4339 .route(
4340 "/api/v1/sessions",
4341 post(move || async move {
4342 (StatusCode::CREATED, create_json.clone())
4343 }),
4344 );
4345
4346 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4347
4348 let cli = Cli {
4349 node,
4350 token: None,
4351 command: Some(Commands::Spawn {
4352 name: Some("test".into()),
4353 workdir: Some("/tmp/repo".into()),
4354 ink: None,
4355 description: None,
4356 detach: true,
4357 idle_threshold: None,
4358 auto: true,
4359 worktree: false,
4360 runtime: None,
4361 secret: vec![],
4362 command: vec!["echo".into(), "hello".into()],
4363 }),
4364 path: None,
4365 };
4366 let result = execute(&cli).await.unwrap();
4367 assert!(result.contains("Created session"));
4368 }
4369
4370 #[cfg(not(coverage))]
4371 #[tokio::test]
4372 async fn test_select_best_node_peer_no_session_count() {
4373 use axum::{Router, routing::get};
4374
4375 let app = Router::new().route(
4376 "/api/v1/peers",
4377 get(|| async {
4378 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4379 }),
4380 );
4381 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4382 let addr = listener.local_addr().unwrap();
4383 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4384 let base = format!("http://127.0.0.1:{}", addr.port());
4385
4386 let client = reqwest::Client::new();
4387 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4388 assert_eq!(name, "fresh");
4390 assert_eq!(addr, "fresh:7433");
4391 }
4392
4393 #[test]
4396 fn test_cli_parse_secret_set() {
4397 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4398 assert!(matches!(
4399 &cli.command,
4400 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4401 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4402 ));
4403 }
4404
4405 #[test]
4406 fn test_cli_parse_secret_list() {
4407 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4408 assert!(matches!(
4409 &cli.command,
4410 Some(Commands::Secret {
4411 action: SecretAction::List
4412 })
4413 ));
4414 }
4415
4416 #[test]
4417 fn test_cli_parse_secret_list_alias() {
4418 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4419 assert!(matches!(
4420 &cli.command,
4421 Some(Commands::Secret {
4422 action: SecretAction::List
4423 })
4424 ));
4425 }
4426
4427 #[test]
4428 fn test_cli_parse_secret_delete() {
4429 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4430 assert!(matches!(
4431 &cli.command,
4432 Some(Commands::Secret { action: SecretAction::Delete { name } })
4433 if name == "MY_TOKEN"
4434 ));
4435 }
4436
4437 #[test]
4438 fn test_cli_parse_secret_delete_alias() {
4439 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4440 assert!(matches!(
4441 &cli.command,
4442 Some(Commands::Secret { action: SecretAction::Delete { name } })
4443 if name == "MY_TOKEN"
4444 ));
4445 }
4446
4447 #[test]
4448 fn test_cli_parse_secret_alias() {
4449 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4450 assert!(matches!(
4451 &cli.command,
4452 Some(Commands::Secret {
4453 action: SecretAction::List
4454 })
4455 ));
4456 }
4457
4458 #[test]
4459 fn test_format_secrets_empty() {
4460 let secrets: Vec<serde_json::Value> = vec![];
4461 assert_eq!(format_secrets(&secrets), "No secrets configured.");
4462 }
4463
4464 #[test]
4465 fn test_format_secrets_with_entries() {
4466 let secrets = vec![
4467 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4468 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4469 ];
4470 let output = format_secrets(&secrets);
4471 assert!(output.contains("GITHUB_TOKEN"));
4472 assert!(output.contains("NPM_TOKEN"));
4473 assert!(output.contains("NAME"));
4474 assert!(output.contains("ENV"));
4475 assert!(output.contains("CREATED"));
4476 }
4477
4478 #[test]
4479 fn test_format_secrets_with_env() {
4480 let secrets = vec![
4481 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4482 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4483 ];
4484 let output = format_secrets(&secrets);
4485 assert!(output.contains("GH_WORK"));
4486 assert!(output.contains("GITHUB_TOKEN"));
4487 assert!(output.contains("NPM_TOKEN"));
4488 }
4489
4490 #[test]
4491 fn test_format_secrets_short_timestamp() {
4492 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4493 let output = format_secrets(&secrets);
4494 assert!(output.contains("now"));
4495 }
4496
4497 #[test]
4498 fn test_format_schedules_short_last_run_at() {
4499 let schedules = vec![serde_json::json!({
4501 "name": "test",
4502 "cron": "* * * * *",
4503 "enabled": true,
4504 "last_run_at": "short",
4505 "target_node": null
4506 })];
4507 let output = format_schedules(&schedules);
4508 assert!(output.contains("short"));
4509 }
4510
4511 #[test]
4512 fn test_format_sessions_multibyte_command_truncation() {
4513 use chrono::Utc;
4514 use pulpo_common::session::SessionStatus;
4515 use uuid::Uuid;
4516
4517 let sessions = vec![Session {
4519 id: Uuid::nil(),
4520 name: "test".into(),
4521 workdir: "/tmp".into(),
4522 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4523 description: None,
4524 status: SessionStatus::Active,
4525 exit_code: None,
4526 backend_session_id: None,
4527 output_snapshot: None,
4528 metadata: None,
4529 ink: None,
4530 intervention_code: None,
4531 intervention_reason: None,
4532 intervention_at: None,
4533 last_output_at: None,
4534 idle_since: None,
4535 idle_threshold_secs: None,
4536 worktree_path: None,
4537 runtime: Runtime::Tmux,
4538 created_at: Utc::now(),
4539 updated_at: Utc::now(),
4540 }];
4541 let output = format_sessions(&sessions);
4542 assert!(output.contains("..."));
4543 }
4544}