1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version = env!("PULPO_VERSION"),
15 args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18 #[arg(long, default_value = "localhost:7433")]
20 pub node: String,
21
22 #[arg(long)]
24 pub token: Option<String>,
25
26 #[command(subcommand)]
27 pub command: Option<Commands>,
28
29 #[arg(value_name = "PATH")]
31 pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37 #[command(visible_alias = "a")]
39 Attach {
40 name: String,
42 },
43
44 #[command(visible_alias = "i", visible_alias = "send")]
46 Input {
47 name: String,
49 text: Option<String>,
51 },
52
53 #[command(visible_alias = "s")]
55 Spawn {
56 name: Option<String>,
58
59 #[arg(long)]
61 workdir: Option<String>,
62
63 #[arg(long)]
65 ink: Option<String>,
66
67 #[arg(long)]
69 description: Option<String>,
70
71 #[arg(short, long)]
73 detach: bool,
74
75 #[arg(long)]
77 idle_threshold: Option<u32>,
78
79 #[arg(long)]
81 auto: bool,
82
83 #[arg(long)]
85 worktree: bool,
86
87 #[arg(long)]
89 runtime: Option<String>,
90
91 #[arg(long)]
93 secret: Vec<String>,
94
95 #[arg(last = true)]
97 command: Vec<String>,
98 },
99
100 #[command(visible_alias = "ls")]
102 List {
103 #[arg(short, long)]
105 all: bool,
106 },
107
108 #[command(visible_alias = "l")]
110 Logs {
111 name: String,
113
114 #[arg(long, default_value = "100")]
116 lines: usize,
117
118 #[arg(short, long)]
120 follow: bool,
121 },
122
123 #[command(visible_alias = "k", alias = "kill")]
125 Stop {
126 #[arg(required = true)]
128 names: Vec<String>,
129
130 #[arg(long, short = 'p')]
132 purge: bool,
133 },
134
135 Cleanup,
137
138 #[command(visible_alias = "r")]
140 Resume {
141 name: String,
143 },
144
145 #[command(visible_alias = "n")]
147 Nodes,
148
149 #[command(visible_alias = "iv")]
151 Interventions {
152 name: String,
154 },
155
156 Ui,
158
159 #[command(visible_alias = "sched")]
161 Schedule {
162 #[command(subcommand)]
163 action: ScheduleAction,
164 },
165
166 #[command(visible_alias = "sec")]
168 Secret {
169 #[command(subcommand)]
170 action: SecretAction,
171 },
172}
173
174#[derive(Subcommand, Debug)]
175pub enum SecretAction {
176 Set {
178 name: String,
180 value: String,
182 #[arg(long)]
184 env: Option<String>,
185 },
186 #[command(visible_alias = "ls")]
188 List,
189 #[command(visible_alias = "rm")]
191 Delete {
192 name: String,
194 },
195}
196
197#[derive(Subcommand, Debug)]
198pub enum ScheduleAction {
199 #[command(alias = "install")]
201 Add {
202 name: String,
204 cron: String,
206 #[arg(long)]
208 workdir: Option<String>,
209 #[arg(long)]
211 node: Option<String>,
212 #[arg(long)]
214 ink: Option<String>,
215 #[arg(long)]
217 description: Option<String>,
218 #[arg(last = true)]
220 command: Vec<String>,
221 },
222 #[command(alias = "ls")]
224 List,
225 #[command(alias = "rm")]
227 Remove {
228 name: String,
230 },
231 Pause {
233 name: String,
235 },
236 Resume {
238 name: String,
240 },
241}
242
243const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
245
246fn resolve_path(path: &str) -> String {
248 let p = std::path::Path::new(path);
249 if p.is_absolute() {
250 path.to_owned()
251 } else {
252 std::env::current_dir().map_or_else(
253 |_| path.to_owned(),
254 |cwd| cwd.join(p).to_string_lossy().into_owned(),
255 )
256 }
257}
258
259fn derive_session_name(path: &str) -> String {
261 let basename = std::path::Path::new(path)
262 .file_name()
263 .and_then(|n| n.to_str())
264 .unwrap_or("session");
265 let kebab: String = basename
267 .chars()
268 .map(|c| {
269 if c.is_ascii_alphanumeric() {
270 c.to_ascii_lowercase()
271 } else {
272 '-'
273 }
274 })
275 .collect();
276 let mut result = String::new();
278 for c in kebab.chars() {
279 if c == '-' && result.ends_with('-') {
280 continue;
281 }
282 result.push(c);
283 }
284 let result = result.trim_matches('-').to_owned();
285 if result.is_empty() {
286 "session".to_owned()
287 } else {
288 result
289 }
290}
291
292async fn deduplicate_session_name(
294 client: &reqwest::Client,
295 base: &str,
296 name: &str,
297 token: Option<&str>,
298) -> String {
299 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
301 .send()
302 .await;
303 match resp {
304 Ok(r) if r.status().is_success() => {
305 for i in 2..=99 {
307 let candidate = format!("{name}-{i}");
308 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
309 .send()
310 .await;
311 match resp {
312 Ok(r) if r.status().is_success() => {}
313 _ => return candidate,
314 }
315 }
316 format!("{name}-100")
317 }
318 _ => name.to_owned(),
319 }
320}
321
322pub fn base_url(node: &str) -> String {
324 format!("http://{node}")
325}
326
327#[derive(serde::Deserialize)]
329struct OutputResponse {
330 output: String,
331}
332
333const fn session_runtime(session: &Session) -> &'static str {
335 match session.runtime {
336 Runtime::Tmux => "tmux",
337 Runtime::Docker => "docker",
338 }
339}
340
341fn format_sessions(sessions: &[Session]) -> String {
342 if sessions.is_empty() {
343 return "No sessions.".into();
344 }
345 let mut lines = vec![format!(
346 "{:<10} {:<24} {:<12} {:<8} {}",
347 "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
348 )];
349 for s in sessions {
350 let cmd_display = if s.command.len() > 50 {
351 let truncated: String = s.command.chars().take(47).collect();
352 format!("{truncated}...")
353 } else {
354 s.command.clone()
355 };
356 let short_id = &s.id.to_string()[..8];
357 let has_pr = s
358 .metadata
359 .as_ref()
360 .is_some_and(|m| m.contains_key("pr_url"));
361 let name_display = match (s.worktree_path.is_some(), has_pr) {
362 (true, true) => format!("{} [wt] [PR]", s.name),
363 (true, false) => format!("{} [wt]", s.name),
364 (false, true) => format!("{} [PR]", s.name),
365 (false, false) => s.name.clone(),
366 };
367 lines.push(format!(
368 "{:<10} {:<24} {:<12} {:<8} {}",
369 short_id,
370 name_display,
371 s.status,
372 session_runtime(s),
373 cmd_display
374 ));
375 }
376 lines.join("\n")
377}
378
379fn format_nodes(resp: &PeersResponse) -> String {
381 let mut lines = vec![format!(
382 "{:<20} {:<25} {:<10} {}",
383 "NAME", "ADDRESS", "STATUS", "SESSIONS"
384 )];
385 lines.push(format!(
386 "{:<20} {:<25} {:<10} {}",
387 resp.local.name, "(local)", "online", "-"
388 ));
389 for p in &resp.peers {
390 let sessions = p
391 .session_count
392 .map_or_else(|| "-".into(), |c| c.to_string());
393 lines.push(format!(
394 "{:<20} {:<25} {:<10} {}",
395 p.name, p.address, p.status, sessions
396 ));
397 }
398 lines.join("\n")
399}
400
401fn format_interventions(events: &[InterventionEventResponse]) -> String {
403 if events.is_empty() {
404 return "No intervention events.".into();
405 }
406 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
407 for e in events {
408 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
409 }
410 lines.join("\n")
411}
412
413#[cfg_attr(coverage, allow(dead_code))]
415fn build_open_command(url: &str) -> std::process::Command {
416 #[cfg(target_os = "macos")]
417 {
418 let mut cmd = std::process::Command::new("open");
419 cmd.arg(url);
420 cmd
421 }
422 #[cfg(target_os = "linux")]
423 {
424 let mut cmd = std::process::Command::new("xdg-open");
425 cmd.arg(url);
426 cmd
427 }
428 #[cfg(target_os = "windows")]
429 {
430 let mut cmd = std::process::Command::new("cmd");
431 cmd.args(["/C", "start", url]);
432 cmd
433 }
434 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
435 {
436 let mut cmd = std::process::Command::new("xdg-open");
438 cmd.arg(url);
439 cmd
440 }
441}
442
443#[cfg(not(coverage))]
445fn open_browser(url: &str) -> Result<()> {
446 build_open_command(url).status()?;
447 Ok(())
448}
449
450#[cfg(coverage)]
452fn open_browser(_url: &str) -> Result<()> {
453 Ok(())
454}
455
456#[cfg_attr(coverage, allow(dead_code))]
459fn build_attach_command(backend_session_id: &str) -> std::process::Command {
460 if let Some(container) = backend_session_id.strip_prefix("docker:") {
462 let mut cmd = std::process::Command::new("docker");
463 cmd.args(["exec", "-it", container, "/bin/sh"]);
464 return cmd;
465 }
466 #[cfg(not(target_os = "windows"))]
468 {
469 let mut cmd = std::process::Command::new("tmux");
470 cmd.args(["attach-session", "-t", backend_session_id]);
471 cmd
472 }
473 #[cfg(target_os = "windows")]
474 {
475 let mut cmd = std::process::Command::new("cmd");
477 cmd.args([
478 "/C",
479 "echo",
480 "Attach not available on Windows. Use the web UI or --runtime docker.",
481 ]);
482 cmd
483 }
484}
485
486#[cfg(not(any(test, coverage, target_os = "windows")))]
488fn attach_session(backend_session_id: &str) -> Result<()> {
489 let status = build_attach_command(backend_session_id).status()?;
490 if !status.success() {
491 anyhow::bail!("attach failed with {status}");
492 }
493 Ok(())
494}
495
496#[cfg(all(target_os = "windows", not(test), not(coverage)))]
498fn attach_session(_backend_session_id: &str) -> Result<()> {
499 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
500 Ok(())
501}
502
503#[cfg(any(test, coverage))]
505#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
506fn attach_session(_backend_session_id: &str) -> Result<()> {
507 Ok(())
508}
509
510fn api_error(text: &str) -> anyhow::Error {
512 serde_json::from_str::<serde_json::Value>(text)
513 .ok()
514 .and_then(|v| v["error"].as_str().map(String::from))
515 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
516}
517
518async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
520 if resp.status().is_success() {
521 Ok(resp.text().await?)
522 } else {
523 let text = resp.text().await?;
524 Err(api_error(&text))
525 }
526}
527
528fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
530 if err.is_connect() {
531 anyhow::anyhow!(
532 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
533 )
534 } else {
535 anyhow::anyhow!("Network error connecting to {node}: {err}")
536 }
537}
538
539fn is_localhost(node: &str) -> bool {
541 let host = node.split(':').next().unwrap_or(node);
542 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
543}
544
545async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
547 let resp = client
548 .get(format!("{base}/api/v1/auth/token"))
549 .send()
550 .await
551 .ok()?;
552 let body: AuthTokenResponse = resp.json().await.ok()?;
553 if body.token.is_empty() {
554 None
555 } else {
556 Some(body.token)
557 }
558}
559
560async fn resolve_token(
562 client: &reqwest::Client,
563 base: &str,
564 node: &str,
565 explicit: Option<&str>,
566) -> Option<String> {
567 if let Some(t) = explicit {
568 return Some(t.to_owned());
569 }
570 if is_localhost(node) {
571 return discover_token(client, base).await;
572 }
573 None
574}
575
576fn node_needs_resolution(node: &str) -> bool {
578 !node.contains(':')
579}
580
581#[cfg(not(coverage))]
588async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
589 if !node_needs_resolution(node) {
591 return (node.to_owned(), None);
592 }
593
594 let local_base = "http://localhost:7433";
596 let mut resolved_address: Option<String> = None;
597
598 if let Ok(resp) = client
599 .get(format!("{local_base}/api/v1/peers"))
600 .send()
601 .await
602 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
603 {
604 for peer in &peers_resp.peers {
605 if peer.name == node {
606 resolved_address = Some(peer.address.clone());
607 break;
608 }
609 }
610 }
611
612 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
613
614 let peer_token = if let Ok(resp) = client
616 .get(format!("{local_base}/api/v1/config"))
617 .send()
618 .await
619 && let Ok(config) = resp.json::<ConfigResponse>().await
620 && let Some(entry) = config.peers.get(node)
621 {
622 entry.token().map(String::from)
623 } else {
624 None
625 };
626
627 (address, peer_token)
628}
629
630#[cfg(coverage)]
632async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
633 if node_needs_resolution(node) {
634 (format!("{node}:7433"), None)
635 } else {
636 (node.to_owned(), None)
637 }
638}
639
640fn authed_get(
642 client: &reqwest::Client,
643 url: String,
644 token: Option<&str>,
645) -> reqwest::RequestBuilder {
646 let req = client.get(url);
647 if let Some(t) = token {
648 req.bearer_auth(t)
649 } else {
650 req
651 }
652}
653
654fn authed_post(
656 client: &reqwest::Client,
657 url: String,
658 token: Option<&str>,
659) -> reqwest::RequestBuilder {
660 let req = client.post(url);
661 if let Some(t) = token {
662 req.bearer_auth(t)
663 } else {
664 req
665 }
666}
667
668fn authed_delete(
670 client: &reqwest::Client,
671 url: String,
672 token: Option<&str>,
673) -> reqwest::RequestBuilder {
674 let req = client.delete(url);
675 if let Some(t) = token {
676 req.bearer_auth(t)
677 } else {
678 req
679 }
680}
681
682#[cfg(not(coverage))]
684fn authed_put(
685 client: &reqwest::Client,
686 url: String,
687 token: Option<&str>,
688) -> reqwest::RequestBuilder {
689 let req = client.put(url);
690 if let Some(t) = token {
691 req.bearer_auth(t)
692 } else {
693 req
694 }
695}
696
697async fn fetch_output(
699 client: &reqwest::Client,
700 base: &str,
701 name: &str,
702 lines: usize,
703 token: Option<&str>,
704) -> Result<String> {
705 let resp = authed_get(
706 client,
707 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
708 token,
709 )
710 .send()
711 .await?;
712 let text = ok_or_api_error(resp).await?;
713 let output: OutputResponse = serde_json::from_str(&text)?;
714 Ok(output.output)
715}
716
717async fn fetch_session_status(
719 client: &reqwest::Client,
720 base: &str,
721 name: &str,
722 token: Option<&str>,
723) -> Result<String> {
724 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
725 .send()
726 .await?;
727 let text = ok_or_api_error(resp).await?;
728 let session: Session = serde_json::from_str(&text)?;
729 Ok(session.status.to_string())
730}
731
732async fn check_session_alive(
736 client: &reqwest::Client,
737 base: &str,
738 session_id: &str,
739 token: Option<&str>,
740) -> Result<()> {
741 for _ in 0..3 {
743 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
744 let resp = authed_get(
746 client,
747 format!("{base}/api/v1/sessions/{session_id}"),
748 token,
749 )
750 .send()
751 .await;
752 if let Ok(resp) = resp
753 && let Ok(text) = ok_or_api_error(resp).await
754 && let Ok(session) = serde_json::from_str::<Session>(&text)
755 {
756 match session.status.to_string().as_str() {
757 "creating" => continue,
758 "lost" | "stopped" => {
759 anyhow::bail!(
760 "Session \"{}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {}",
761 session.name,
762 session.name
763 );
764 }
765 _ => return Ok(()),
766 }
767 }
768 break;
770 }
771 Ok(())
772}
773
774fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
781 if prev.is_empty() {
782 return new;
783 }
784
785 let prev_lines: Vec<&str> = prev.lines().collect();
786 let new_lines: Vec<&str> = new.lines().collect();
787
788 if new_lines.is_empty() {
789 return "";
790 }
791
792 let last_prev = prev_lines[prev_lines.len() - 1];
794
795 for i in (0..new_lines.len()).rev() {
797 if new_lines[i] == last_prev {
798 let overlap_len = prev_lines.len().min(i + 1);
800 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
801 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
802 if prev_tail == new_overlap {
803 if i + 1 < new_lines.len() {
804 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
806 return new.get(consumed.min(new.len())..).unwrap_or("");
807 }
808 return "";
809 }
810 }
811 }
812
813 new
815}
816
817async fn follow_logs(
819 client: &reqwest::Client,
820 base: &str,
821 name: &str,
822 lines: usize,
823 token: Option<&str>,
824 writer: &mut (dyn std::io::Write + Send),
825) -> Result<()> {
826 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
827 write!(writer, "{prev_output}")?;
828
829 let mut unchanged_ticks: u32 = 0;
830
831 loop {
832 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
833
834 let new_output = fetch_output(client, base, name, lines, token).await?;
836
837 let diff = diff_output(&prev_output, &new_output);
838 if diff.is_empty() {
839 unchanged_ticks += 1;
840 } else {
841 write!(writer, "{diff}")?;
842 unchanged_ticks = 0;
843 }
844
845 if new_output.contains(AGENT_EXIT_MARKER) {
847 break;
848 }
849
850 prev_output = new_output;
851
852 if unchanged_ticks >= 3 {
854 let status = fetch_session_status(client, base, name, token).await?;
855 let is_terminal = status == "ready" || status == "stopped" || status == "lost";
856 if is_terminal {
857 break;
858 }
859 }
860 }
861 Ok(())
862}
863
864#[cfg(not(coverage))]
868async fn execute_schedule(
869 client: &reqwest::Client,
870 action: &ScheduleAction,
871 base: &str,
872 token: Option<&str>,
873) -> Result<String> {
874 match action {
875 ScheduleAction::Add {
876 name,
877 cron,
878 workdir,
879 node,
880 ink,
881 description,
882 command,
883 } => {
884 let cmd = if command.is_empty() {
885 None
886 } else {
887 Some(command.join(" "))
888 };
889 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
890 std::env::current_dir()
891 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
892 });
893 let mut body = serde_json::json!({
894 "name": name,
895 "cron": cron,
896 "workdir": resolved_workdir,
897 });
898 if let Some(c) = &cmd {
899 body["command"] = serde_json::json!(c);
900 }
901 if let Some(n) = node {
902 body["target_node"] = serde_json::json!(n);
903 }
904 if let Some(i) = ink {
905 body["ink"] = serde_json::json!(i);
906 }
907 if let Some(d) = description {
908 body["description"] = serde_json::json!(d);
909 }
910 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
911 .json(&body)
912 .send()
913 .await?;
914 ok_or_api_error(resp).await?;
915 Ok(format!("Created schedule \"{name}\""))
916 }
917 ScheduleAction::List => {
918 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
919 .send()
920 .await?;
921 let text = ok_or_api_error(resp).await?;
922 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
923 Ok(format_schedules(&schedules))
924 }
925 ScheduleAction::Remove { name } => {
926 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
927 .send()
928 .await?;
929 ok_or_api_error(resp).await?;
930 Ok(format!("Removed schedule \"{name}\""))
931 }
932 ScheduleAction::Pause { name } => {
933 let body = serde_json::json!({ "enabled": false });
934 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
935 .json(&body)
936 .send()
937 .await?;
938 ok_or_api_error(resp).await?;
939 Ok(format!("Paused schedule \"{name}\""))
940 }
941 ScheduleAction::Resume { name } => {
942 let body = serde_json::json!({ "enabled": true });
943 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
944 .json(&body)
945 .send()
946 .await?;
947 ok_or_api_error(resp).await?;
948 Ok(format!("Resumed schedule \"{name}\""))
949 }
950 }
951}
952
953#[cfg(coverage)]
955#[allow(clippy::unnecessary_wraps)]
956async fn execute_schedule(
957 _client: &reqwest::Client,
958 _action: &ScheduleAction,
959 _base: &str,
960 _token: Option<&str>,
961) -> Result<String> {
962 Ok(String::new())
963}
964
965#[cfg_attr(coverage, allow(dead_code))]
969fn format_secrets(secrets: &[serde_json::Value]) -> String {
970 if secrets.is_empty() {
971 return "No secrets configured.".into();
972 }
973 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
974 for s in secrets {
975 let name = s["name"].as_str().unwrap_or("?");
976 let env_display = s["env"]
977 .as_str()
978 .map_or_else(|| name.to_owned(), String::from);
979 let created = s["created_at"]
980 .as_str()
981 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
982 lines.push(format!("{name:<24} {env_display:<24} {created}"));
983 }
984 lines.join("\n")
985}
986
987#[cfg(not(coverage))]
989async fn execute_secret(
990 client: &reqwest::Client,
991 action: &SecretAction,
992 base: &str,
993 token: Option<&str>,
994) -> Result<String> {
995 match action {
996 SecretAction::Set { name, value, env } => {
997 let mut body = serde_json::json!({ "value": value });
998 if let Some(e) = env {
999 body["env"] = serde_json::json!(e);
1000 }
1001 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
1002 .json(&body)
1003 .send()
1004 .await?;
1005 ok_or_api_error(resp).await?;
1006 Ok(format!("Secret \"{name}\" set."))
1007 }
1008 SecretAction::List => {
1009 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
1010 .send()
1011 .await?;
1012 let text = ok_or_api_error(resp).await?;
1013 let parsed: serde_json::Value = serde_json::from_str(&text)?;
1014 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1015 Ok(format_secrets(secrets))
1016 }
1017 SecretAction::Delete { name } => {
1018 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1019 .send()
1020 .await?;
1021 ok_or_api_error(resp).await?;
1022 Ok(format!("Secret \"{name}\" deleted."))
1023 }
1024 }
1025}
1026
1027#[cfg(coverage)]
1029#[allow(clippy::unnecessary_wraps)]
1030async fn execute_secret(
1031 _client: &reqwest::Client,
1032 _action: &SecretAction,
1033 _base: &str,
1034 _token: Option<&str>,
1035) -> Result<String> {
1036 Ok(String::new())
1037}
1038
1039#[cfg_attr(coverage, allow(dead_code))]
1041fn format_schedules(schedules: &[serde_json::Value]) -> String {
1042 if schedules.is_empty() {
1043 return "No schedules.".into();
1044 }
1045 let mut lines = vec![format!(
1046 "{:<20} {:<18} {:<8} {:<12} {}",
1047 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1048 )];
1049 for s in schedules {
1050 let name = s["name"].as_str().unwrap_or("?");
1051 let cron = s["cron"].as_str().unwrap_or("?");
1052 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1053 "yes"
1054 } else {
1055 "no"
1056 };
1057 let last_run = s["last_run_at"]
1058 .as_str()
1059 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1060 let node = s["target_node"].as_str().unwrap_or("local");
1061 lines.push(format!(
1062 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1063 ));
1064 }
1065 lines.join("\n")
1066}
1067
1068#[cfg(not(coverage))]
1072async fn select_best_node(
1073 client: &reqwest::Client,
1074 base: &str,
1075 token: Option<&str>,
1076) -> Result<(String, String)> {
1077 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1078 .send()
1079 .await?;
1080 let text = ok_or_api_error(resp).await?;
1081 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1082
1083 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1087 if peer.status != pulpo_common::peer::PeerStatus::Online {
1088 continue;
1089 }
1090 let sessions = peer.session_count.unwrap_or(0);
1091 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1092 #[allow(clippy::cast_precision_loss)]
1094 let score = sessions as f64 - (mem as f64 / 1024.0);
1095 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1096 best = Some((peer.address.clone(), peer.name.clone(), score));
1097 }
1098 }
1099
1100 match best {
1102 Some((addr, name, _)) => Ok((addr, name)),
1103 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1104 }
1105}
1106
1107#[cfg(coverage)]
1109#[allow(clippy::unnecessary_wraps)]
1110async fn select_best_node(
1111 _client: &reqwest::Client,
1112 _base: &str,
1113 _token: Option<&str>,
1114) -> Result<(String, String)> {
1115 Ok(("localhost:7433".into(), "local".into()))
1116}
1117
1118#[allow(clippy::too_many_lines)]
1120pub async fn execute(cli: &Cli) -> Result<String> {
1121 let client = reqwest::Client::new();
1122 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1123 let url = base_url(&resolved_node);
1124 let node = &resolved_node;
1125 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1126 .await
1127 .or(peer_token);
1128
1129 if cli.command.is_none() && cli.path.is_none() {
1131 use clap::CommandFactory;
1133 let mut cmd = Cli::command();
1134 cmd.print_help()?;
1135 println!();
1136 return Ok(String::new());
1137 }
1138 if cli.command.is_none() {
1139 let path = cli.path.as_deref().unwrap_or(".");
1140 let resolved_workdir = resolve_path(path);
1141 let base_name = derive_session_name(&resolved_workdir);
1142 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1143 let body = serde_json::json!({
1144 "name": name,
1145 "workdir": resolved_workdir,
1146 });
1147 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1148 .json(&body)
1149 .send()
1150 .await
1151 .map_err(|e| friendly_error(&e, node))?;
1152 let text = ok_or_api_error(resp).await?;
1153 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1154 let msg = format!(
1155 "Created session \"{}\" ({})",
1156 resp.session.name, resp.session.id
1157 );
1158 let backend_id = resp
1159 .session
1160 .backend_session_id
1161 .as_deref()
1162 .unwrap_or(&resp.session.name);
1163 eprintln!("{msg}");
1164 attach_session(backend_id)?;
1167 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1168 }
1169
1170 match cli.command.as_ref().unwrap() {
1171 Commands::Attach { name } => {
1172 let resp = authed_get(
1174 &client,
1175 format!("{url}/api/v1/sessions/{name}"),
1176 token.as_deref(),
1177 )
1178 .send()
1179 .await
1180 .map_err(|e| friendly_error(&e, node))?;
1181 let text = ok_or_api_error(resp).await?;
1182 let session: Session = serde_json::from_str(&text)?;
1183 match session.status.to_string().as_str() {
1184 "lost" => {
1185 anyhow::bail!(
1186 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1187 );
1188 }
1189 "stopped" => {
1190 anyhow::bail!(
1191 "Session \"{name}\" is {} — cannot attach to a stopped session.",
1192 session.status
1193 );
1194 }
1195 _ => {}
1196 }
1197 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1198 attach_session(&backend_id)?;
1199 Ok(format!("Detached from session {name}."))
1200 }
1201 Commands::Input { name, text } => {
1202 let input_text = text.as_deref().unwrap_or("\n");
1203 let body = serde_json::json!({ "text": input_text });
1204 let resp = authed_post(
1205 &client,
1206 format!("{url}/api/v1/sessions/{name}/input"),
1207 token.as_deref(),
1208 )
1209 .json(&body)
1210 .send()
1211 .await
1212 .map_err(|e| friendly_error(&e, node))?;
1213 ok_or_api_error(resp).await?;
1214 Ok(format!("Sent input to session {name}."))
1215 }
1216 Commands::List { all } => {
1217 let list_url = if *all {
1218 format!("{url}/api/v1/sessions")
1219 } else {
1220 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1221 };
1222 let resp = authed_get(&client, list_url, token.as_deref())
1223 .send()
1224 .await
1225 .map_err(|e| friendly_error(&e, node))?;
1226 let text = ok_or_api_error(resp).await?;
1227 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1228 Ok(format_sessions(&sessions))
1229 }
1230 Commands::Nodes => {
1231 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1232 .send()
1233 .await
1234 .map_err(|e| friendly_error(&e, node))?;
1235 let text = ok_or_api_error(resp).await?;
1236 let resp: PeersResponse = serde_json::from_str(&text)?;
1237 Ok(format_nodes(&resp))
1238 }
1239 Commands::Spawn {
1240 workdir,
1241 name,
1242 ink,
1243 description,
1244 detach,
1245 idle_threshold,
1246 auto,
1247 worktree,
1248 runtime,
1249 secret,
1250 command,
1251 } => {
1252 let cmd = if command.is_empty() {
1253 None
1254 } else {
1255 Some(command.join(" "))
1256 };
1257 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1259 std::env::current_dir()
1260 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1261 });
1262 let resolved_name = if let Some(n) = name {
1264 n.clone()
1265 } else {
1266 let base_name = derive_session_name(&resolved_workdir);
1267 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1268 };
1269 let mut body = serde_json::json!({
1270 "name": resolved_name,
1271 "workdir": resolved_workdir,
1272 });
1273 if let Some(c) = &cmd {
1274 body["command"] = serde_json::json!(c);
1275 }
1276 if let Some(i) = ink {
1277 body["ink"] = serde_json::json!(i);
1278 }
1279 if let Some(d) = description {
1280 body["description"] = serde_json::json!(d);
1281 }
1282 if let Some(t) = idle_threshold {
1283 body["idle_threshold_secs"] = serde_json::json!(t);
1284 }
1285 if *worktree {
1286 body["worktree"] = serde_json::json!(true);
1287 eprintln!(
1288 "Worktree: branch pulpo/{resolved_name} in ~/.pulpo/worktrees/{resolved_name}/"
1289 );
1290 }
1291 if let Some(rt) = runtime {
1292 body["runtime"] = serde_json::json!(rt);
1293 }
1294 if !secret.is_empty() {
1295 body["secrets"] = serde_json::json!(secret);
1296 }
1297 let spawn_url = if *auto {
1298 let (auto_addr, auto_name) =
1299 select_best_node(&client, &url, token.as_deref()).await?;
1300 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1301 base_url(&auto_addr)
1302 } else {
1303 url.clone()
1304 };
1305 let resp = authed_post(
1306 &client,
1307 format!("{spawn_url}/api/v1/sessions"),
1308 token.as_deref(),
1309 )
1310 .json(&body)
1311 .send()
1312 .await
1313 .map_err(|e| friendly_error(&e, node))?;
1314 let text = ok_or_api_error(resp).await?;
1315 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1316 let msg = format!(
1317 "Created session \"{}\" ({})",
1318 resp.session.name, resp.session.id
1319 );
1320 if !detach {
1321 let backend_id = resp
1322 .session
1323 .backend_session_id
1324 .as_deref()
1325 .unwrap_or(&resp.session.name);
1326 eprintln!("{msg}");
1327 if cmd.is_some() {
1330 let sid = resp.session.id.to_string();
1331 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1332 }
1333 attach_session(backend_id)?;
1334 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1335 }
1336 Ok(msg)
1337 }
1338 Commands::Stop { names, purge } => {
1339 let mut results = Vec::new();
1340 for name in names {
1341 let query = if *purge { "?purge=true" } else { "" };
1342 let resp = authed_post(
1343 &client,
1344 format!("{url}/api/v1/sessions/{name}/stop{query}"),
1345 token.as_deref(),
1346 )
1347 .send()
1348 .await
1349 .map_err(|e| friendly_error(&e, node))?;
1350 let action = if *purge {
1351 "stopped and purged"
1352 } else {
1353 "stopped"
1354 };
1355 match ok_or_api_error(resp).await {
1356 Ok(_) => results.push(format!("Session {name} {action}.")),
1357 Err(e) => results.push(format!("Error stopping {name}: {e}")),
1358 }
1359 }
1360 Ok(results.join("\n"))
1361 }
1362 Commands::Cleanup => {
1363 let resp = authed_post(
1364 &client,
1365 format!("{url}/api/v1/sessions/cleanup"),
1366 token.as_deref(),
1367 )
1368 .send()
1369 .await
1370 .map_err(|e| friendly_error(&e, node))?;
1371 let text = ok_or_api_error(resp).await?;
1372 let result: serde_json::Value = serde_json::from_str(&text)?;
1373 let count = result["deleted"].as_u64().unwrap_or(0);
1374 if count == 0 {
1375 Ok("No stopped or lost sessions to clean up.".into())
1376 } else {
1377 Ok(format!("Cleaned up {count} session(s)."))
1378 }
1379 }
1380 Commands::Logs {
1381 name,
1382 lines,
1383 follow,
1384 } => {
1385 if *follow {
1386 let mut stdout = std::io::stdout();
1387 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1388 .await
1389 .map_err(|e| {
1390 match e.downcast::<reqwest::Error>() {
1392 Ok(re) => friendly_error(&re, node),
1393 Err(other) => other,
1394 }
1395 })?;
1396 Ok(String::new())
1397 } else {
1398 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1399 .await
1400 .map_err(|e| match e.downcast::<reqwest::Error>() {
1401 Ok(re) => friendly_error(&re, node),
1402 Err(other) => other,
1403 })?;
1404 Ok(output)
1405 }
1406 }
1407 Commands::Interventions { name } => {
1408 let resp = authed_get(
1409 &client,
1410 format!("{url}/api/v1/sessions/{name}/interventions"),
1411 token.as_deref(),
1412 )
1413 .send()
1414 .await
1415 .map_err(|e| friendly_error(&e, node))?;
1416 let text = ok_or_api_error(resp).await?;
1417 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1418 Ok(format_interventions(&events))
1419 }
1420 Commands::Ui => {
1421 let dashboard = base_url(node);
1422 open_browser(&dashboard)?;
1423 Ok(format!("Opening {dashboard}"))
1424 }
1425 Commands::Resume { name } => {
1426 let resp = authed_post(
1427 &client,
1428 format!("{url}/api/v1/sessions/{name}/resume"),
1429 token.as_deref(),
1430 )
1431 .send()
1432 .await
1433 .map_err(|e| friendly_error(&e, node))?;
1434 let text = ok_or_api_error(resp).await?;
1435 let session: Session = serde_json::from_str(&text)?;
1436 let backend_id = session
1437 .backend_session_id
1438 .as_deref()
1439 .unwrap_or(&session.name);
1440 eprintln!("Resumed session \"{}\"", session.name);
1441 let sid = session.id.to_string();
1442 check_session_alive(&client, &url, &sid, token.as_deref()).await?;
1443 attach_session(backend_id)?;
1444 Ok(format!("Detached from session \"{}\".", session.name))
1445 }
1446 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1447 .await
1448 .map_err(|e| match e.downcast::<reqwest::Error>() {
1449 Ok(re) => friendly_error(&re, node),
1450 Err(other) => other,
1451 }),
1452 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1453 .await
1454 .map_err(|e| match e.downcast::<reqwest::Error>() {
1455 Ok(re) => friendly_error(&re, node),
1456 Err(other) => other,
1457 }),
1458 }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use super::*;
1464
1465 #[test]
1466 fn test_base_url() {
1467 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1468 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1469 }
1470
1471 #[test]
1472 fn test_cli_parse_list() {
1473 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1474 assert_eq!(cli.node, "localhost:7433");
1475 assert!(matches!(cli.command, Some(Commands::List { .. })));
1476 }
1477
1478 #[test]
1479 fn test_cli_parse_nodes() {
1480 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1481 assert!(matches!(cli.command, Some(Commands::Nodes)));
1482 }
1483
1484 #[test]
1485 fn test_cli_parse_ui() {
1486 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1487 assert!(matches!(cli.command, Some(Commands::Ui)));
1488 }
1489
1490 #[test]
1491 fn test_cli_parse_ui_custom_node() {
1492 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1493 assert_eq!(cli.node, "mac-mini:7433");
1495 assert_eq!(cli.path.as_deref(), Some("ui"));
1496 }
1497
1498 #[test]
1499 fn test_build_open_command() {
1500 let cmd = build_open_command("http://localhost:7433");
1501 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1502 assert_eq!(args, vec!["http://localhost:7433"]);
1503 #[cfg(target_os = "macos")]
1504 assert_eq!(cmd.get_program(), "open");
1505 #[cfg(target_os = "linux")]
1506 assert_eq!(cmd.get_program(), "xdg-open");
1507 }
1508
1509 #[test]
1510 fn test_cli_parse_spawn() {
1511 let cli = Cli::try_parse_from([
1512 "pulpo",
1513 "spawn",
1514 "my-task",
1515 "--workdir",
1516 "/tmp/repo",
1517 "--",
1518 "claude",
1519 "-p",
1520 "Fix the bug",
1521 ])
1522 .unwrap();
1523 assert!(matches!(
1524 &cli.command,
1525 Some(Commands::Spawn { name, workdir, command, .. })
1526 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1527 && command == &["claude", "-p", "Fix the bug"]
1528 ));
1529 }
1530
1531 #[test]
1532 fn test_cli_parse_spawn_with_ink() {
1533 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1534 assert!(matches!(
1535 &cli.command,
1536 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1537 ));
1538 }
1539
1540 #[test]
1541 fn test_cli_parse_spawn_with_description() {
1542 let cli =
1543 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1544 .unwrap();
1545 assert!(matches!(
1546 &cli.command,
1547 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1548 ));
1549 }
1550
1551 #[test]
1552 fn test_cli_parse_spawn_name_positional() {
1553 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1554 assert!(matches!(
1555 &cli.command,
1556 Some(Commands::Spawn { name, command, .. })
1557 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1558 ));
1559 }
1560
1561 #[test]
1562 fn test_cli_parse_spawn_no_command() {
1563 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1564 assert!(matches!(
1565 &cli.command,
1566 Some(Commands::Spawn { command, .. }) if command.is_empty()
1567 ));
1568 }
1569
1570 #[test]
1571 fn test_cli_parse_spawn_idle_threshold() {
1572 let cli =
1573 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1574 assert!(matches!(
1575 &cli.command,
1576 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1577 ));
1578 }
1579
1580 #[test]
1581 fn test_cli_parse_spawn_idle_threshold_60() {
1582 let cli =
1583 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1584 assert!(matches!(
1585 &cli.command,
1586 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1587 ));
1588 }
1589
1590 #[test]
1591 fn test_cli_parse_spawn_secrets() {
1592 let cli = Cli::try_parse_from([
1593 "pulpo",
1594 "spawn",
1595 "my-task",
1596 "--secret",
1597 "GITHUB_TOKEN",
1598 "--secret",
1599 "NPM_TOKEN",
1600 ])
1601 .unwrap();
1602 assert!(matches!(
1603 &cli.command,
1604 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1605 ));
1606 }
1607
1608 #[test]
1609 fn test_cli_parse_spawn_no_secrets() {
1610 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1611 assert!(matches!(
1612 &cli.command,
1613 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1614 ));
1615 }
1616
1617 #[test]
1618 fn test_cli_parse_secret_set_with_env() {
1619 let cli = Cli::try_parse_from([
1620 "pulpo",
1621 "secret",
1622 "set",
1623 "GH_WORK",
1624 "token123",
1625 "--env",
1626 "GITHUB_TOKEN",
1627 ])
1628 .unwrap();
1629 assert!(matches!(
1630 &cli.command,
1631 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1632 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1633 ));
1634 }
1635
1636 #[test]
1637 fn test_cli_parse_secret_set_without_env() {
1638 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1639 assert!(matches!(
1640 &cli.command,
1641 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1642 if name == "MY_KEY" && value == "val" && env.is_none()
1643 ));
1644 }
1645
1646 #[test]
1647 fn test_cli_parse_spawn_worktree() {
1648 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1649 assert!(matches!(
1650 &cli.command,
1651 Some(Commands::Spawn { worktree, .. }) if *worktree
1652 ));
1653 }
1654
1655 #[test]
1656 fn test_cli_parse_spawn_detach() {
1657 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1658 assert!(matches!(
1659 &cli.command,
1660 Some(Commands::Spawn { detach, .. }) if *detach
1661 ));
1662 }
1663
1664 #[test]
1665 fn test_cli_parse_spawn_detach_short() {
1666 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1667 assert!(matches!(
1668 &cli.command,
1669 Some(Commands::Spawn { detach, .. }) if *detach
1670 ));
1671 }
1672
1673 #[test]
1674 fn test_cli_parse_spawn_detach_default() {
1675 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1676 assert!(matches!(
1677 &cli.command,
1678 Some(Commands::Spawn { detach, .. }) if !detach
1679 ));
1680 }
1681
1682 #[test]
1683 fn test_cli_parse_logs() {
1684 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1685 assert!(matches!(
1686 &cli.command,
1687 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1688 ));
1689 }
1690
1691 #[test]
1692 fn test_cli_parse_logs_with_lines() {
1693 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1694 assert!(matches!(
1695 &cli.command,
1696 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1697 ));
1698 }
1699
1700 #[test]
1701 fn test_cli_parse_logs_follow() {
1702 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1703 assert!(matches!(
1704 &cli.command,
1705 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1706 ));
1707 }
1708
1709 #[test]
1710 fn test_cli_parse_logs_follow_short() {
1711 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1712 assert!(matches!(
1713 &cli.command,
1714 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1715 ));
1716 }
1717
1718 #[test]
1719 fn test_cli_parse_stop() {
1720 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session"]).unwrap();
1721 assert!(matches!(
1722 &cli.command,
1723 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1724 ));
1725 }
1726
1727 #[test]
1728 fn test_cli_parse_stop_purge() {
1729 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "--purge"]).unwrap();
1730 assert!(matches!(
1731 &cli.command,
1732 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1733 ));
1734
1735 let cli = Cli::try_parse_from(["pulpo", "stop", "my-session", "-p"]).unwrap();
1736 assert!(matches!(
1737 &cli.command,
1738 Some(Commands::Stop { names, purge }) if names == &["my-session"] && *purge
1739 ));
1740 }
1741
1742 #[test]
1743 fn test_cli_parse_kill_alias() {
1744 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1745 assert!(matches!(
1746 &cli.command,
1747 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
1748 ));
1749 }
1750
1751 #[test]
1752 fn test_cli_parse_resume() {
1753 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1754 assert!(matches!(
1755 &cli.command,
1756 Some(Commands::Resume { name }) if name == "my-session"
1757 ));
1758 }
1759
1760 #[test]
1761 fn test_cli_parse_input() {
1762 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1763 assert!(matches!(
1764 &cli.command,
1765 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1766 ));
1767 }
1768
1769 #[test]
1770 fn test_cli_parse_input_no_text() {
1771 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1772 assert!(matches!(
1773 &cli.command,
1774 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1775 ));
1776 }
1777
1778 #[test]
1779 fn test_cli_parse_input_alias() {
1780 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1781 assert!(matches!(
1782 &cli.command,
1783 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1784 ));
1785 }
1786
1787 #[test]
1788 fn test_cli_parse_custom_node() {
1789 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1790 assert_eq!(cli.node, "win-pc:8080");
1791 assert_eq!(cli.path.as_deref(), Some("list"));
1793 }
1794
1795 #[test]
1796 fn test_cli_version() {
1797 let result = Cli::try_parse_from(["pulpo", "--version"]);
1798 let err = result.unwrap_err();
1800 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1801 }
1802
1803 #[test]
1804 fn test_cli_parse_no_subcommand_succeeds() {
1805 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1806 assert!(cli.command.is_none());
1807 assert!(cli.path.is_none());
1808 }
1809
1810 #[test]
1811 fn test_cli_debug() {
1812 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1813 let debug = format!("{cli:?}");
1814 assert!(debug.contains("List"));
1815 }
1816
1817 #[test]
1818 fn test_commands_debug() {
1819 let cmd = Commands::List { all: false };
1820 assert_eq!(format!("{cmd:?}"), "List { all: false }");
1821 }
1822
1823 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1825
1826 fn test_create_response_json() -> String {
1828 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1829 }
1830
1831 async fn start_test_server() -> String {
1833 use axum::http::StatusCode;
1834 use axum::{
1835 Json, Router,
1836 routing::{get, post},
1837 };
1838
1839 let create_json = test_create_response_json();
1840
1841 let app = Router::new()
1842 .route(
1843 "/api/v1/sessions",
1844 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1845 (StatusCode::CREATED, create_json.clone())
1846 }),
1847 )
1848 .route(
1849 "/api/v1/sessions/{id}",
1850 get(|| async { TEST_SESSION_JSON.to_owned() }),
1851 )
1852 .route(
1853 "/api/v1/sessions/{id}/stop",
1854 post(|| async { StatusCode::NO_CONTENT }),
1855 )
1856 .route(
1857 "/api/v1/sessions/{id}/output",
1858 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1859 )
1860 .route(
1861 "/api/v1/peers",
1862 get(|| async {
1863 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1864 }),
1865 )
1866 .route(
1867 "/api/v1/sessions/{id}/resume",
1868 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1869 )
1870 .route(
1871 "/api/v1/sessions/{id}/interventions",
1872 get(|| async { "[]".to_owned() }),
1873 )
1874 .route(
1875 "/api/v1/sessions/{id}/input",
1876 post(|| async { StatusCode::NO_CONTENT }),
1877 )
1878 .route(
1879 "/api/v1/schedules",
1880 get(|| async { Json::<Vec<()>>(vec![]) })
1881 .post(|| async { StatusCode::CREATED }),
1882 )
1883 .route(
1884 "/api/v1/schedules/{id}",
1885 axum::routing::put(|| async { StatusCode::OK })
1886 .delete(|| async { StatusCode::NO_CONTENT }),
1887 );
1888
1889 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1890 let addr = listener.local_addr().unwrap();
1891 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1892 format!("127.0.0.1:{}", addr.port())
1893 }
1894
1895 #[tokio::test]
1896 async fn test_execute_list_success() {
1897 let node = start_test_server().await;
1898 let cli = Cli {
1899 node,
1900 token: None,
1901 command: Some(Commands::List { all: false }),
1902 path: None,
1903 };
1904 let result = execute(&cli).await.unwrap();
1905 assert_eq!(result, "No sessions.");
1906 }
1907
1908 #[tokio::test]
1909 async fn test_execute_nodes_success() {
1910 let node = start_test_server().await;
1911 let cli = Cli {
1912 node,
1913 token: None,
1914 command: Some(Commands::Nodes),
1915 path: None,
1916 };
1917 let result = execute(&cli).await.unwrap();
1918 assert!(result.contains("test"));
1919 assert!(result.contains("(local)"));
1920 assert!(result.contains("NAME"));
1921 }
1922
1923 #[tokio::test]
1924 async fn test_execute_spawn_success() {
1925 let node = start_test_server().await;
1926 let cli = Cli {
1927 node,
1928 token: None,
1929 command: Some(Commands::Spawn {
1930 name: Some("test".into()),
1931 workdir: Some("/tmp/repo".into()),
1932 ink: None,
1933 description: None,
1934 detach: true,
1935 idle_threshold: None,
1936 auto: false,
1937 worktree: false,
1938 runtime: None,
1939 secret: vec![],
1940 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1941 }),
1942 path: None,
1943 };
1944 let result = execute(&cli).await.unwrap();
1945 assert!(result.contains("Created session"));
1946 assert!(result.contains("repo"));
1947 }
1948
1949 #[tokio::test]
1950 async fn test_execute_spawn_with_all_flags() {
1951 let node = start_test_server().await;
1952 let cli = Cli {
1953 node,
1954 token: None,
1955 command: Some(Commands::Spawn {
1956 name: Some("test".into()),
1957 workdir: Some("/tmp/repo".into()),
1958 ink: Some("coder".into()),
1959 description: Some("Fix the bug".into()),
1960 detach: true,
1961 idle_threshold: None,
1962 auto: false,
1963 worktree: false,
1964 runtime: None,
1965 secret: vec![],
1966 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1967 }),
1968 path: None,
1969 };
1970 let result = execute(&cli).await.unwrap();
1971 assert!(result.contains("Created session"));
1972 }
1973
1974 #[tokio::test]
1975 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1976 let node = start_test_server().await;
1977 let cli = Cli {
1978 node,
1979 token: None,
1980 command: Some(Commands::Spawn {
1981 name: Some("full-opts".into()),
1982 workdir: Some("/tmp/repo".into()),
1983 ink: Some("coder".into()),
1984 description: Some("Full options".into()),
1985 detach: true,
1986 idle_threshold: Some(120),
1987 auto: false,
1988 worktree: true,
1989 runtime: Some("docker".into()),
1990 secret: vec![],
1991 command: vec!["claude".into()],
1992 }),
1993 path: None,
1994 };
1995 let result = execute(&cli).await.unwrap();
1996 assert!(result.contains("Created session"));
1997 }
1998
1999 #[tokio::test]
2000 async fn test_execute_spawn_no_name_derives_from_workdir() {
2001 let node = start_test_server().await;
2002 let cli = Cli {
2003 node,
2004 token: None,
2005 command: Some(Commands::Spawn {
2006 name: None,
2007 workdir: Some("/tmp/my-project".into()),
2008 ink: None,
2009 description: None,
2010 detach: true,
2011 idle_threshold: None,
2012 auto: false,
2013 worktree: false,
2014 runtime: None,
2015 secret: vec![],
2016 command: vec!["echo".into(), "hello".into()],
2017 }),
2018 path: None,
2019 };
2020 let result = execute(&cli).await.unwrap();
2021 assert!(result.contains("Created session"));
2022 }
2023
2024 #[tokio::test]
2025 async fn test_execute_spawn_no_command() {
2026 let node = start_test_server().await;
2027 let cli = Cli {
2028 node,
2029 token: None,
2030 command: Some(Commands::Spawn {
2031 name: Some("test".into()),
2032 workdir: Some("/tmp/repo".into()),
2033 ink: None,
2034 description: None,
2035 detach: true,
2036 idle_threshold: None,
2037 auto: false,
2038 worktree: false,
2039 runtime: None,
2040 secret: vec![],
2041 command: vec![],
2042 }),
2043 path: None,
2044 };
2045 let result = execute(&cli).await.unwrap();
2046 assert!(result.contains("Created session"));
2047 }
2048
2049 #[tokio::test]
2050 async fn test_execute_spawn_with_name() {
2051 let node = start_test_server().await;
2052 let cli = Cli {
2053 node,
2054 token: None,
2055 command: Some(Commands::Spawn {
2056 name: Some("my-task".into()),
2057 workdir: Some("/tmp/repo".into()),
2058 ink: None,
2059 description: None,
2060 detach: true,
2061 idle_threshold: None,
2062 auto: false,
2063 worktree: false,
2064 runtime: None,
2065 secret: vec![],
2066 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2067 }),
2068 path: None,
2069 };
2070 let result = execute(&cli).await.unwrap();
2071 assert!(result.contains("Created session"));
2072 }
2073
2074 #[tokio::test]
2075 async fn test_execute_spawn_auto_attach() {
2076 let node = start_test_server().await;
2077 let cli = Cli {
2078 node,
2079 token: None,
2080 command: Some(Commands::Spawn {
2081 name: Some("test".into()),
2082 workdir: Some("/tmp/repo".into()),
2083 ink: None,
2084 description: None,
2085 detach: false,
2086 idle_threshold: None,
2087 auto: false,
2088 worktree: false,
2089 runtime: None,
2090 secret: vec![],
2091 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2092 }),
2093 path: None,
2094 };
2095 let result = execute(&cli).await.unwrap();
2096 assert!(result.contains("Detached from session"));
2098 }
2099
2100 #[tokio::test]
2101 async fn test_execute_stop_success() {
2102 let node = start_test_server().await;
2103 let cli = Cli {
2104 node,
2105 token: None,
2106 command: Some(Commands::Stop {
2107 names: vec!["test-session".into()],
2108 purge: false,
2109 }),
2110 path: None,
2111 };
2112 let result = execute(&cli).await.unwrap();
2113 assert!(result.contains("stopped"));
2114 assert!(!result.contains("purged"));
2115 }
2116
2117 #[tokio::test]
2118 async fn test_execute_stop_with_purge() {
2119 let node = start_test_server().await;
2120 let cli = Cli {
2121 node,
2122 token: None,
2123 command: Some(Commands::Stop {
2124 names: vec!["test-session".into()],
2125 purge: true,
2126 }),
2127 path: None,
2128 };
2129 let result = execute(&cli).await.unwrap();
2130 assert!(result.contains("stopped and purged"));
2131 }
2132
2133 #[tokio::test]
2134 async fn test_execute_logs_success() {
2135 let node = start_test_server().await;
2136 let cli = Cli {
2137 node,
2138 token: None,
2139 command: Some(Commands::Logs {
2140 name: "test-session".into(),
2141 lines: 50,
2142 follow: false,
2143 }),
2144 path: None,
2145 };
2146 let result = execute(&cli).await.unwrap();
2147 assert!(result.contains("test output"));
2148 }
2149
2150 #[tokio::test]
2151 async fn test_execute_list_connection_refused() {
2152 let cli = Cli {
2153 node: "localhost:1".into(),
2154 token: None,
2155 command: Some(Commands::List { all: false }),
2156 path: None,
2157 };
2158 let result = execute(&cli).await;
2159 let err = result.unwrap_err().to_string();
2160 assert!(
2161 err.contains("Could not connect to pulpod"),
2162 "Expected friendly error, got: {err}"
2163 );
2164 assert!(err.contains("localhost:1"));
2165 }
2166
2167 #[tokio::test]
2168 async fn test_execute_nodes_connection_refused() {
2169 let cli = Cli {
2170 node: "localhost:1".into(),
2171 token: None,
2172 command: Some(Commands::Nodes),
2173 path: None,
2174 };
2175 let result = execute(&cli).await;
2176 let err = result.unwrap_err().to_string();
2177 assert!(err.contains("Could not connect to pulpod"));
2178 }
2179
2180 #[tokio::test]
2181 async fn test_execute_stop_error_response() {
2182 use axum::{Router, http::StatusCode, routing::post};
2183
2184 let app = Router::new().route(
2185 "/api/v1/sessions/{id}/stop",
2186 post(|| async {
2187 (
2188 StatusCode::NOT_FOUND,
2189 "{\"error\":\"session not found: test-session\"}",
2190 )
2191 }),
2192 );
2193 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2194 let addr = listener.local_addr().unwrap();
2195 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2196 let node = format!("127.0.0.1:{}", addr.port());
2197
2198 let cli = Cli {
2199 node,
2200 token: None,
2201 command: Some(Commands::Stop {
2202 names: vec!["test-session".into()],
2203 purge: false,
2204 }),
2205 path: None,
2206 };
2207 let result = execute(&cli).await.unwrap();
2208 assert!(result.contains("Error stopping test-session"), "{result}");
2209 }
2210
2211 #[tokio::test]
2212 async fn test_execute_logs_error_response() {
2213 use axum::{Router, http::StatusCode, routing::get};
2214
2215 let app = Router::new().route(
2216 "/api/v1/sessions/{id}/output",
2217 get(|| async {
2218 (
2219 StatusCode::NOT_FOUND,
2220 "{\"error\":\"session not found: ghost\"}",
2221 )
2222 }),
2223 );
2224 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2225 let addr = listener.local_addr().unwrap();
2226 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2227 let node = format!("127.0.0.1:{}", addr.port());
2228
2229 let cli = Cli {
2230 node,
2231 token: None,
2232 command: Some(Commands::Logs {
2233 name: "ghost".into(),
2234 lines: 50,
2235 follow: false,
2236 }),
2237 path: None,
2238 };
2239 let err = execute(&cli).await.unwrap_err();
2240 assert_eq!(err.to_string(), "session not found: ghost");
2241 }
2242
2243 #[tokio::test]
2244 async fn test_execute_resume_error_response() {
2245 use axum::{Router, http::StatusCode, routing::post};
2246
2247 let app = Router::new().route(
2248 "/api/v1/sessions/{id}/resume",
2249 post(|| async {
2250 (
2251 StatusCode::BAD_REQUEST,
2252 "{\"error\":\"session is not lost (status: active)\"}",
2253 )
2254 }),
2255 );
2256 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2257 let addr = listener.local_addr().unwrap();
2258 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2259 let node = format!("127.0.0.1:{}", addr.port());
2260
2261 let cli = Cli {
2262 node,
2263 token: None,
2264 command: Some(Commands::Resume {
2265 name: "test-session".into(),
2266 }),
2267 path: None,
2268 };
2269 let err = execute(&cli).await.unwrap_err();
2270 assert_eq!(err.to_string(), "session is not lost (status: active)");
2271 }
2272
2273 #[tokio::test]
2274 async fn test_execute_spawn_error_response() {
2275 use axum::{Router, http::StatusCode, routing::post};
2276
2277 let app = Router::new().route(
2278 "/api/v1/sessions",
2279 post(|| async {
2280 (
2281 StatusCode::INTERNAL_SERVER_ERROR,
2282 "{\"error\":\"failed to spawn session\"}",
2283 )
2284 }),
2285 );
2286 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2287 let addr = listener.local_addr().unwrap();
2288 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2289 let node = format!("127.0.0.1:{}", addr.port());
2290
2291 let cli = Cli {
2292 node,
2293 token: None,
2294 command: Some(Commands::Spawn {
2295 name: Some("test".into()),
2296 workdir: Some("/tmp/repo".into()),
2297 ink: None,
2298 description: None,
2299 detach: true,
2300 idle_threshold: None,
2301 auto: false,
2302 worktree: false,
2303 runtime: None,
2304 secret: vec![],
2305 command: vec!["test".into()],
2306 }),
2307 path: None,
2308 };
2309 let err = execute(&cli).await.unwrap_err();
2310 assert_eq!(err.to_string(), "failed to spawn session");
2311 }
2312
2313 #[tokio::test]
2314 async fn test_execute_interventions_error_response() {
2315 use axum::{Router, http::StatusCode, routing::get};
2316
2317 let app = Router::new().route(
2318 "/api/v1/sessions/{id}/interventions",
2319 get(|| async {
2320 (
2321 StatusCode::NOT_FOUND,
2322 "{\"error\":\"session not found: ghost\"}",
2323 )
2324 }),
2325 );
2326 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2327 let addr = listener.local_addr().unwrap();
2328 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2329 let node = format!("127.0.0.1:{}", addr.port());
2330
2331 let cli = Cli {
2332 node,
2333 token: None,
2334 command: Some(Commands::Interventions {
2335 name: "ghost".into(),
2336 }),
2337 path: None,
2338 };
2339 let err = execute(&cli).await.unwrap_err();
2340 assert_eq!(err.to_string(), "session not found: ghost");
2341 }
2342
2343 #[tokio::test]
2344 async fn test_execute_resume_success() {
2345 let node = start_test_server().await;
2346 let cli = Cli {
2347 node,
2348 token: None,
2349 command: Some(Commands::Resume {
2350 name: "test-session".into(),
2351 }),
2352 path: None,
2353 };
2354 let result = execute(&cli).await.unwrap();
2355 assert!(result.contains("Detached from session"));
2356 }
2357
2358 #[tokio::test]
2359 async fn test_execute_input_success() {
2360 let node = start_test_server().await;
2361 let cli = Cli {
2362 node,
2363 token: None,
2364 command: Some(Commands::Input {
2365 name: "test-session".into(),
2366 text: Some("yes".into()),
2367 }),
2368 path: None,
2369 };
2370 let result = execute(&cli).await.unwrap();
2371 assert!(result.contains("Sent input to session test-session"));
2372 }
2373
2374 #[tokio::test]
2375 async fn test_execute_input_no_text() {
2376 let node = start_test_server().await;
2377 let cli = Cli {
2378 node,
2379 token: None,
2380 command: Some(Commands::Input {
2381 name: "test-session".into(),
2382 text: None,
2383 }),
2384 path: None,
2385 };
2386 let result = execute(&cli).await.unwrap();
2387 assert!(result.contains("Sent input to session test-session"));
2388 }
2389
2390 #[tokio::test]
2391 async fn test_execute_input_connection_refused() {
2392 let cli = Cli {
2393 node: "localhost:1".into(),
2394 token: None,
2395 command: Some(Commands::Input {
2396 name: "test".into(),
2397 text: Some("y".into()),
2398 }),
2399 path: None,
2400 };
2401 let result = execute(&cli).await;
2402 let err = result.unwrap_err().to_string();
2403 assert!(err.contains("Could not connect to pulpod"));
2404 }
2405
2406 #[tokio::test]
2407 async fn test_execute_input_error_response() {
2408 use axum::{Router, http::StatusCode, routing::post};
2409
2410 let app = Router::new().route(
2411 "/api/v1/sessions/{id}/input",
2412 post(|| async {
2413 (
2414 StatusCode::NOT_FOUND,
2415 "{\"error\":\"session not found: ghost\"}",
2416 )
2417 }),
2418 );
2419 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2420 let addr = listener.local_addr().unwrap();
2421 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2422 let node = format!("127.0.0.1:{}", addr.port());
2423
2424 let cli = Cli {
2425 node,
2426 token: None,
2427 command: Some(Commands::Input {
2428 name: "ghost".into(),
2429 text: Some("y".into()),
2430 }),
2431 path: None,
2432 };
2433 let err = execute(&cli).await.unwrap_err();
2434 assert_eq!(err.to_string(), "session not found: ghost");
2435 }
2436
2437 #[tokio::test]
2438 async fn test_execute_ui() {
2439 let cli = Cli {
2440 node: "localhost:7433".into(),
2441 token: None,
2442 command: Some(Commands::Ui),
2443 path: None,
2444 };
2445 let result = execute(&cli).await.unwrap();
2446 assert!(result.contains("Opening"));
2447 assert!(result.contains("http://localhost:7433"));
2448 }
2449
2450 #[tokio::test]
2451 async fn test_execute_ui_custom_node() {
2452 let cli = Cli {
2453 node: "mac-mini:7433".into(),
2454 token: None,
2455 command: Some(Commands::Ui),
2456 path: None,
2457 };
2458 let result = execute(&cli).await.unwrap();
2459 assert!(result.contains("http://mac-mini:7433"));
2460 }
2461
2462 #[test]
2463 fn test_format_sessions_empty() {
2464 assert_eq!(format_sessions(&[]), "No sessions.");
2465 }
2466
2467 #[test]
2468 fn test_format_sessions_with_data() {
2469 use chrono::Utc;
2470 use pulpo_common::session::SessionStatus;
2471 use uuid::Uuid;
2472
2473 let sessions = vec![Session {
2474 id: Uuid::nil(),
2475 name: "my-api".into(),
2476 workdir: "/tmp/repo".into(),
2477 command: "claude -p 'Fix the bug'".into(),
2478 description: Some("Fix the bug".into()),
2479 status: SessionStatus::Active,
2480 exit_code: None,
2481 backend_session_id: None,
2482 output_snapshot: None,
2483 metadata: None,
2484 ink: None,
2485 intervention_code: None,
2486 intervention_reason: None,
2487 intervention_at: None,
2488 last_output_at: None,
2489 idle_since: None,
2490 idle_threshold_secs: None,
2491 worktree_path: None,
2492 runtime: Runtime::Tmux,
2493 created_at: Utc::now(),
2494 updated_at: Utc::now(),
2495 }];
2496 let output = format_sessions(&sessions);
2497 assert!(output.contains("ID"));
2498 assert!(output.contains("NAME"));
2499 assert!(output.contains("RUNTIME"));
2500 assert!(output.contains("COMMAND"));
2501 assert!(output.contains("00000000"));
2502 assert!(output.contains("my-api"));
2503 assert!(output.contains("active"));
2504 assert!(output.contains("tmux"));
2505 assert!(output.contains("claude -p 'Fix the bug'"));
2506 }
2507
2508 #[test]
2509 fn test_format_sessions_docker_runtime() {
2510 use chrono::Utc;
2511 use pulpo_common::session::SessionStatus;
2512 use uuid::Uuid;
2513
2514 let sessions = vec![Session {
2515 id: Uuid::nil(),
2516 name: "sandbox-test".into(),
2517 workdir: "/tmp".into(),
2518 command: "claude".into(),
2519 description: None,
2520 status: SessionStatus::Active,
2521 exit_code: None,
2522 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2523 output_snapshot: None,
2524 metadata: None,
2525 ink: None,
2526 intervention_code: None,
2527 intervention_reason: None,
2528 intervention_at: None,
2529 last_output_at: None,
2530 idle_since: None,
2531 idle_threshold_secs: None,
2532 worktree_path: None,
2533 runtime: Runtime::Docker,
2534 created_at: Utc::now(),
2535 updated_at: Utc::now(),
2536 }];
2537 let output = format_sessions(&sessions);
2538 assert!(output.contains("docker"));
2539 }
2540
2541 #[test]
2542 fn test_format_sessions_long_command_truncated() {
2543 use chrono::Utc;
2544 use pulpo_common::session::SessionStatus;
2545 use uuid::Uuid;
2546
2547 let sessions = vec![Session {
2548 id: Uuid::nil(),
2549 name: "test".into(),
2550 workdir: "/tmp".into(),
2551 command:
2552 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2553 .into(),
2554 description: None,
2555 status: SessionStatus::Ready,
2556 exit_code: None,
2557 backend_session_id: None,
2558 output_snapshot: None,
2559 metadata: None,
2560 ink: None,
2561 intervention_code: None,
2562 intervention_reason: None,
2563 intervention_at: None,
2564 last_output_at: None,
2565 idle_since: None,
2566 idle_threshold_secs: None,
2567 worktree_path: None,
2568 runtime: Runtime::Tmux,
2569 created_at: Utc::now(),
2570 updated_at: Utc::now(),
2571 }];
2572 let output = format_sessions(&sessions);
2573 assert!(output.contains("..."));
2574 }
2575
2576 #[test]
2577 fn test_format_sessions_worktree_indicator() {
2578 use chrono::Utc;
2579 use pulpo_common::session::SessionStatus;
2580 use uuid::Uuid;
2581
2582 let sessions = vec![Session {
2583 id: Uuid::nil(),
2584 name: "wt-task".into(),
2585 workdir: "/repo".into(),
2586 command: "claude".into(),
2587 description: None,
2588 status: SessionStatus::Active,
2589 exit_code: None,
2590 backend_session_id: None,
2591 output_snapshot: None,
2592 metadata: None,
2593 ink: None,
2594 intervention_code: None,
2595 intervention_reason: None,
2596 intervention_at: None,
2597 last_output_at: None,
2598 idle_since: None,
2599 idle_threshold_secs: None,
2600 worktree_path: Some("/home/user/.pulpo/worktrees/wt-task".into()),
2601 runtime: Runtime::Tmux,
2602 created_at: Utc::now(),
2603 updated_at: Utc::now(),
2604 }];
2605 let output = format_sessions(&sessions);
2606 assert!(
2607 output.contains("[wt]"),
2608 "should show worktree indicator: {output}"
2609 );
2610 assert!(output.contains("wt-task [wt]"));
2611 }
2612
2613 #[test]
2614 fn test_format_sessions_pr_indicator() {
2615 use chrono::Utc;
2616 use pulpo_common::session::SessionStatus;
2617 use std::collections::HashMap;
2618 use uuid::Uuid;
2619
2620 let mut meta = HashMap::new();
2621 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2622 let sessions = vec![Session {
2623 id: Uuid::nil(),
2624 name: "pr-task".into(),
2625 workdir: "/tmp".into(),
2626 command: "claude".into(),
2627 description: None,
2628 status: SessionStatus::Active,
2629 exit_code: None,
2630 backend_session_id: None,
2631 output_snapshot: None,
2632 metadata: Some(meta),
2633 ink: None,
2634 intervention_code: None,
2635 intervention_reason: None,
2636 intervention_at: None,
2637 last_output_at: None,
2638 idle_since: None,
2639 idle_threshold_secs: None,
2640 worktree_path: None,
2641 runtime: Runtime::Tmux,
2642 created_at: Utc::now(),
2643 updated_at: Utc::now(),
2644 }];
2645 let output = format_sessions(&sessions);
2646 assert!(
2647 output.contains("[PR]"),
2648 "should show PR indicator: {output}"
2649 );
2650 assert!(output.contains("pr-task [PR]"));
2651 }
2652
2653 #[test]
2654 fn test_format_sessions_worktree_and_pr_indicator() {
2655 use chrono::Utc;
2656 use pulpo_common::session::SessionStatus;
2657 use std::collections::HashMap;
2658 use uuid::Uuid;
2659
2660 let mut meta = HashMap::new();
2661 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2662 let sessions = vec![Session {
2663 id: Uuid::nil(),
2664 name: "both-task".into(),
2665 workdir: "/tmp".into(),
2666 command: "claude".into(),
2667 description: None,
2668 status: SessionStatus::Active,
2669 exit_code: None,
2670 backend_session_id: None,
2671 output_snapshot: None,
2672 metadata: Some(meta),
2673 ink: None,
2674 intervention_code: None,
2675 intervention_reason: None,
2676 intervention_at: None,
2677 last_output_at: None,
2678 idle_since: None,
2679 idle_threshold_secs: None,
2680 worktree_path: Some("/home/user/.pulpo/worktrees/both-task".into()),
2681 runtime: Runtime::Tmux,
2682 created_at: Utc::now(),
2683 updated_at: Utc::now(),
2684 }];
2685 let output = format_sessions(&sessions);
2686 assert!(
2687 output.contains("[wt] [PR]"),
2688 "should show both indicators: {output}"
2689 );
2690 }
2691
2692 #[test]
2693 fn test_format_sessions_no_pr_without_metadata() {
2694 use chrono::Utc;
2695 use pulpo_common::session::SessionStatus;
2696 use uuid::Uuid;
2697
2698 let sessions = vec![Session {
2699 id: Uuid::nil(),
2700 name: "no-pr".into(),
2701 workdir: "/tmp".into(),
2702 command: "claude".into(),
2703 description: None,
2704 status: SessionStatus::Active,
2705 exit_code: None,
2706 backend_session_id: None,
2707 output_snapshot: None,
2708 metadata: None,
2709 ink: None,
2710 intervention_code: None,
2711 intervention_reason: None,
2712 intervention_at: None,
2713 last_output_at: None,
2714 idle_since: None,
2715 idle_threshold_secs: None,
2716 worktree_path: None,
2717 runtime: Runtime::Tmux,
2718 created_at: Utc::now(),
2719 updated_at: Utc::now(),
2720 }];
2721 let output = format_sessions(&sessions);
2722 assert!(
2723 !output.contains("[PR]"),
2724 "should not show PR indicator: {output}"
2725 );
2726 }
2727
2728 #[test]
2729 fn test_format_nodes() {
2730 use pulpo_common::node::NodeInfo;
2731 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2732
2733 let resp = PeersResponse {
2734 local: NodeInfo {
2735 name: "mac-mini".into(),
2736 hostname: "h".into(),
2737 os: "macos".into(),
2738 arch: "arm64".into(),
2739 cpus: 8,
2740 memory_mb: 16384,
2741 gpu: None,
2742 },
2743 peers: vec![PeerInfo {
2744 name: "win-pc".into(),
2745 address: "win-pc:7433".into(),
2746 status: PeerStatus::Online,
2747 node_info: None,
2748 session_count: Some(3),
2749 source: PeerSource::Configured,
2750 }],
2751 };
2752 let output = format_nodes(&resp);
2753 assert!(output.contains("mac-mini"));
2754 assert!(output.contains("(local)"));
2755 assert!(output.contains("win-pc"));
2756 assert!(output.contains('3'));
2757 }
2758
2759 #[test]
2760 fn test_format_nodes_no_session_count() {
2761 use pulpo_common::node::NodeInfo;
2762 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2763
2764 let resp = PeersResponse {
2765 local: NodeInfo {
2766 name: "local".into(),
2767 hostname: "h".into(),
2768 os: "linux".into(),
2769 arch: "x86_64".into(),
2770 cpus: 4,
2771 memory_mb: 8192,
2772 gpu: None,
2773 },
2774 peers: vec![PeerInfo {
2775 name: "peer".into(),
2776 address: "peer:7433".into(),
2777 status: PeerStatus::Offline,
2778 node_info: None,
2779 session_count: None,
2780 source: PeerSource::Configured,
2781 }],
2782 };
2783 let output = format_nodes(&resp);
2784 assert!(output.contains("offline"));
2785 let lines: Vec<&str> = output.lines().collect();
2787 assert!(lines[2].contains('-'));
2788 }
2789
2790 #[tokio::test]
2791 async fn test_execute_resume_connection_refused() {
2792 let cli = Cli {
2793 node: "localhost:1".into(),
2794 token: None,
2795 command: Some(Commands::Resume {
2796 name: "test".into(),
2797 }),
2798 path: None,
2799 };
2800 let result = execute(&cli).await;
2801 let err = result.unwrap_err().to_string();
2802 assert!(err.contains("Could not connect to pulpod"));
2803 }
2804
2805 #[tokio::test]
2806 async fn test_execute_spawn_connection_refused() {
2807 let cli = Cli {
2808 node: "localhost:1".into(),
2809 token: None,
2810 command: Some(Commands::Spawn {
2811 name: Some("test".into()),
2812 workdir: Some("/tmp".into()),
2813 ink: None,
2814 description: None,
2815 detach: true,
2816 idle_threshold: None,
2817 auto: false,
2818 worktree: false,
2819 runtime: None,
2820 secret: vec![],
2821 command: vec!["test".into()],
2822 }),
2823 path: None,
2824 };
2825 let result = execute(&cli).await;
2826 let err = result.unwrap_err().to_string();
2827 assert!(err.contains("Could not connect to pulpod"));
2828 }
2829
2830 #[tokio::test]
2831 async fn test_execute_stop_connection_refused() {
2832 let cli = Cli {
2833 node: "localhost:1".into(),
2834 token: None,
2835 command: Some(Commands::Stop {
2836 names: vec!["test".into()],
2837 purge: false,
2838 }),
2839 path: None,
2840 };
2841 let result = execute(&cli).await;
2842 let err = result.unwrap_err().to_string();
2843 assert!(err.contains("Could not connect to pulpod"));
2844 }
2845
2846 #[tokio::test]
2847 async fn test_execute_logs_connection_refused() {
2848 let cli = Cli {
2849 node: "localhost:1".into(),
2850 token: None,
2851 command: Some(Commands::Logs {
2852 name: "test".into(),
2853 lines: 50,
2854 follow: false,
2855 }),
2856 path: None,
2857 };
2858 let result = execute(&cli).await;
2859 let err = result.unwrap_err().to_string();
2860 assert!(err.contains("Could not connect to pulpod"));
2861 }
2862
2863 #[tokio::test]
2864 async fn test_friendly_error_connect() {
2865 let err = reqwest::Client::new()
2867 .get("http://127.0.0.1:1")
2868 .send()
2869 .await
2870 .unwrap_err();
2871 let friendly = friendly_error(&err, "test-node:1");
2872 let msg = friendly.to_string();
2873 assert!(
2874 msg.contains("Could not connect"),
2875 "Expected connect message, got: {msg}"
2876 );
2877 }
2878
2879 #[tokio::test]
2880 async fn test_friendly_error_other() {
2881 let err = reqwest::Client::new()
2883 .get("http://[::invalid::url")
2884 .send()
2885 .await
2886 .unwrap_err();
2887 let friendly = friendly_error(&err, "bad-host");
2888 let msg = friendly.to_string();
2889 assert!(
2890 msg.contains("Network error"),
2891 "Expected network error message, got: {msg}"
2892 );
2893 assert!(msg.contains("bad-host"));
2894 }
2895
2896 #[test]
2899 fn test_is_localhost_variants() {
2900 assert!(is_localhost("localhost:7433"));
2901 assert!(is_localhost("127.0.0.1:7433"));
2902 assert!(is_localhost("[::1]:7433"));
2903 assert!(is_localhost("::1"));
2904 assert!(is_localhost("localhost"));
2905 assert!(!is_localhost("mac-mini:7433"));
2906 assert!(!is_localhost("192.168.1.100:7433"));
2907 }
2908
2909 #[test]
2910 fn test_authed_get_with_token() {
2911 let client = reqwest::Client::new();
2912 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2913 .build()
2914 .unwrap();
2915 let auth = req
2916 .headers()
2917 .get("authorization")
2918 .unwrap()
2919 .to_str()
2920 .unwrap();
2921 assert_eq!(auth, "Bearer tok");
2922 }
2923
2924 #[test]
2925 fn test_authed_get_without_token() {
2926 let client = reqwest::Client::new();
2927 let req = authed_get(&client, "http://h:1/api".into(), None)
2928 .build()
2929 .unwrap();
2930 assert!(req.headers().get("authorization").is_none());
2931 }
2932
2933 #[test]
2934 fn test_authed_post_with_token() {
2935 let client = reqwest::Client::new();
2936 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2937 .build()
2938 .unwrap();
2939 let auth = req
2940 .headers()
2941 .get("authorization")
2942 .unwrap()
2943 .to_str()
2944 .unwrap();
2945 assert_eq!(auth, "Bearer secret");
2946 }
2947
2948 #[test]
2949 fn test_authed_post_without_token() {
2950 let client = reqwest::Client::new();
2951 let req = authed_post(&client, "http://h:1/api".into(), None)
2952 .build()
2953 .unwrap();
2954 assert!(req.headers().get("authorization").is_none());
2955 }
2956
2957 #[test]
2958 fn test_authed_delete_with_token() {
2959 let client = reqwest::Client::new();
2960 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2961 .build()
2962 .unwrap();
2963 let auth = req
2964 .headers()
2965 .get("authorization")
2966 .unwrap()
2967 .to_str()
2968 .unwrap();
2969 assert_eq!(auth, "Bearer del-tok");
2970 }
2971
2972 #[test]
2973 fn test_authed_delete_without_token() {
2974 let client = reqwest::Client::new();
2975 let req = authed_delete(&client, "http://h:1/api".into(), None)
2976 .build()
2977 .unwrap();
2978 assert!(req.headers().get("authorization").is_none());
2979 }
2980
2981 #[tokio::test]
2982 async fn test_resolve_token_explicit() {
2983 let client = reqwest::Client::new();
2984 let token =
2985 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2986 assert_eq!(token, Some("my-tok".into()));
2987 }
2988
2989 #[tokio::test]
2990 async fn test_resolve_token_remote_no_explicit() {
2991 let client = reqwest::Client::new();
2992 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2993 assert_eq!(token, None);
2994 }
2995
2996 #[tokio::test]
2997 async fn test_resolve_token_localhost_auto_discover() {
2998 use axum::{Json, Router, routing::get};
2999
3000 let app = Router::new().route(
3001 "/api/v1/auth/token",
3002 get(|| async {
3003 Json(AuthTokenResponse {
3004 token: "discovered".into(),
3005 })
3006 }),
3007 );
3008 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3009 let addr = listener.local_addr().unwrap();
3010 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3011
3012 let node = format!("localhost:{}", addr.port());
3013 let base = base_url(&node);
3014 let client = reqwest::Client::new();
3015 let token = resolve_token(&client, &base, &node, None).await;
3016 assert_eq!(token, Some("discovered".into()));
3017 }
3018
3019 #[tokio::test]
3020 async fn test_discover_token_empty_returns_none() {
3021 use axum::{Json, Router, routing::get};
3022
3023 let app = Router::new().route(
3024 "/api/v1/auth/token",
3025 get(|| async {
3026 Json(AuthTokenResponse {
3027 token: String::new(),
3028 })
3029 }),
3030 );
3031 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3032 let addr = listener.local_addr().unwrap();
3033 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3034
3035 let base = format!("http://127.0.0.1:{}", addr.port());
3036 let client = reqwest::Client::new();
3037 assert_eq!(discover_token(&client, &base).await, None);
3038 }
3039
3040 #[tokio::test]
3041 async fn test_discover_token_unreachable_returns_none() {
3042 let client = reqwest::Client::new();
3043 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3044 }
3045
3046 #[test]
3047 fn test_cli_parse_with_token() {
3048 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3049 assert_eq!(cli.token, Some("my-secret".into()));
3050 }
3051
3052 #[test]
3053 fn test_cli_parse_without_token() {
3054 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3055 assert_eq!(cli.token, None);
3056 }
3057
3058 #[tokio::test]
3059 async fn test_execute_with_explicit_token_sends_header() {
3060 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3061
3062 let app = Router::new().route(
3063 "/api/v1/sessions",
3064 get(|req: Request| async move {
3065 let auth = req
3066 .headers()
3067 .get("authorization")
3068 .and_then(|v| v.to_str().ok())
3069 .unwrap_or("");
3070 assert_eq!(auth, "Bearer test-token");
3071 (StatusCode::OK, "[]".to_owned())
3072 }),
3073 );
3074 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3075 let addr = listener.local_addr().unwrap();
3076 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3077 let node = format!("127.0.0.1:{}", addr.port());
3078
3079 let cli = Cli {
3080 node,
3081 token: Some("test-token".into()),
3082 command: Some(Commands::List { all: false }),
3083 path: None,
3084 };
3085 let result = execute(&cli).await.unwrap();
3086 assert_eq!(result, "No sessions.");
3087 }
3088
3089 #[test]
3092 fn test_cli_parse_interventions() {
3093 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3094 assert!(matches!(
3095 &cli.command,
3096 Some(Commands::Interventions { name }) if name == "my-session"
3097 ));
3098 }
3099
3100 #[test]
3101 fn test_format_interventions_empty() {
3102 assert_eq!(format_interventions(&[]), "No intervention events.");
3103 }
3104
3105 #[test]
3106 fn test_format_interventions_with_data() {
3107 let events = vec![
3108 InterventionEventResponse {
3109 id: 1,
3110 session_id: "sess-1".into(),
3111 code: None,
3112 reason: "Memory exceeded threshold".into(),
3113 created_at: "2026-01-01T00:00:00Z".into(),
3114 },
3115 InterventionEventResponse {
3116 id: 2,
3117 session_id: "sess-1".into(),
3118 code: None,
3119 reason: "Idle for 10 minutes".into(),
3120 created_at: "2026-01-02T00:00:00Z".into(),
3121 },
3122 ];
3123 let output = format_interventions(&events);
3124 assert!(output.contains("ID"));
3125 assert!(output.contains("TIMESTAMP"));
3126 assert!(output.contains("REASON"));
3127 assert!(output.contains("Memory exceeded threshold"));
3128 assert!(output.contains("Idle for 10 minutes"));
3129 assert!(output.contains("2026-01-01T00:00:00Z"));
3130 }
3131
3132 #[tokio::test]
3133 async fn test_execute_interventions_empty() {
3134 let node = start_test_server().await;
3135 let cli = Cli {
3136 node,
3137 token: None,
3138 command: Some(Commands::Interventions {
3139 name: "my-session".into(),
3140 }),
3141 path: None,
3142 };
3143 let result = execute(&cli).await.unwrap();
3144 assert_eq!(result, "No intervention events.");
3145 }
3146
3147 #[tokio::test]
3148 async fn test_execute_interventions_with_data() {
3149 use axum::{Router, routing::get};
3150
3151 let app = Router::new().route(
3152 "/api/v1/sessions/{id}/interventions",
3153 get(|| async {
3154 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3155 .to_owned()
3156 }),
3157 );
3158 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3159 let addr = listener.local_addr().unwrap();
3160 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3161 let node = format!("127.0.0.1:{}", addr.port());
3162
3163 let cli = Cli {
3164 node,
3165 token: None,
3166 command: Some(Commands::Interventions {
3167 name: "test".into(),
3168 }),
3169 path: None,
3170 };
3171 let result = execute(&cli).await.unwrap();
3172 assert!(result.contains("OOM"));
3173 assert!(result.contains("2026-01-01T00:00:00Z"));
3174 }
3175
3176 #[tokio::test]
3177 async fn test_execute_interventions_connection_refused() {
3178 let cli = Cli {
3179 node: "localhost:1".into(),
3180 token: None,
3181 command: Some(Commands::Interventions {
3182 name: "test".into(),
3183 }),
3184 path: None,
3185 };
3186 let result = execute(&cli).await;
3187 let err = result.unwrap_err().to_string();
3188 assert!(err.contains("Could not connect to pulpod"));
3189 }
3190
3191 #[test]
3194 fn test_build_attach_command_tmux() {
3195 let cmd = build_attach_command("my-session");
3196 assert_eq!(cmd.get_program(), "tmux");
3197 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3198 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3199 }
3200
3201 #[test]
3202 fn test_build_attach_command_docker() {
3203 let cmd = build_attach_command("docker:pulpo-my-task");
3204 assert_eq!(cmd.get_program(), "docker");
3205 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3206 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3207 }
3208
3209 #[test]
3210 fn test_cli_parse_attach() {
3211 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3212 assert!(matches!(
3213 &cli.command,
3214 Some(Commands::Attach { name }) if name == "my-session"
3215 ));
3216 }
3217
3218 #[test]
3219 fn test_cli_parse_attach_alias() {
3220 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3221 assert!(matches!(
3222 &cli.command,
3223 Some(Commands::Attach { name }) if name == "my-session"
3224 ));
3225 }
3226
3227 #[tokio::test]
3228 async fn test_execute_attach_success() {
3229 let node = start_test_server().await;
3230 let cli = Cli {
3231 node,
3232 token: None,
3233 command: Some(Commands::Attach {
3234 name: "test-session".into(),
3235 }),
3236 path: None,
3237 };
3238 let result = execute(&cli).await.unwrap();
3239 assert!(result.contains("Detached from session test-session"));
3240 }
3241
3242 #[tokio::test]
3243 async fn test_execute_attach_with_backend_session_id() {
3244 use axum::{Router, routing::get};
3245 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3246 let app = Router::new().route(
3247 "/api/v1/sessions/{id}",
3248 get(move || async move { session_json.to_owned() }),
3249 );
3250 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3251 let addr = listener.local_addr().unwrap();
3252 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3253
3254 let cli = Cli {
3255 node: format!("127.0.0.1:{}", addr.port()),
3256 token: None,
3257 command: Some(Commands::Attach {
3258 name: "my-session".into(),
3259 }),
3260 path: None,
3261 };
3262 let result = execute(&cli).await.unwrap();
3263 assert!(result.contains("Detached from session my-session"));
3264 }
3265
3266 #[tokio::test]
3267 async fn test_execute_attach_connection_refused() {
3268 let cli = Cli {
3269 node: "localhost:1".into(),
3270 token: None,
3271 command: Some(Commands::Attach {
3272 name: "test-session".into(),
3273 }),
3274 path: None,
3275 };
3276 let result = execute(&cli).await;
3277 let err = result.unwrap_err().to_string();
3278 assert!(err.contains("Could not connect to pulpod"));
3279 }
3280
3281 #[tokio::test]
3282 async fn test_execute_attach_error_response() {
3283 use axum::{Router, http::StatusCode, routing::get};
3284 let app = Router::new().route(
3285 "/api/v1/sessions/{id}",
3286 get(|| async {
3287 (
3288 StatusCode::NOT_FOUND,
3289 r#"{"error":"session not found"}"#.to_owned(),
3290 )
3291 }),
3292 );
3293 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3294 let addr = listener.local_addr().unwrap();
3295 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3296
3297 let cli = Cli {
3298 node: format!("127.0.0.1:{}", addr.port()),
3299 token: None,
3300 command: Some(Commands::Attach {
3301 name: "nonexistent".into(),
3302 }),
3303 path: None,
3304 };
3305 let result = execute(&cli).await;
3306 let err = result.unwrap_err().to_string();
3307 assert!(err.contains("session not found"));
3308 }
3309
3310 #[tokio::test]
3311 async fn test_execute_attach_stale_session() {
3312 use axum::{Router, routing::get};
3313 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3314 let app = Router::new().route(
3315 "/api/v1/sessions/{id}",
3316 get(move || async move { session_json.to_owned() }),
3317 );
3318 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3319 let addr = listener.local_addr().unwrap();
3320 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3321
3322 let cli = Cli {
3323 node: format!("127.0.0.1:{}", addr.port()),
3324 token: None,
3325 command: Some(Commands::Attach {
3326 name: "stale-sess".into(),
3327 }),
3328 path: None,
3329 };
3330 let result = execute(&cli).await;
3331 let err = result.unwrap_err().to_string();
3332 assert!(err.contains("lost"));
3333 assert!(err.contains("pulpo resume"));
3334 }
3335
3336 #[tokio::test]
3337 async fn test_execute_attach_dead_session() {
3338 use axum::{Router, routing::get};
3339 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3340 let app = Router::new().route(
3341 "/api/v1/sessions/{id}",
3342 get(move || async move { session_json.to_owned() }),
3343 );
3344 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3345 let addr = listener.local_addr().unwrap();
3346 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3347
3348 let cli = Cli {
3349 node: format!("127.0.0.1:{}", addr.port()),
3350 token: None,
3351 command: Some(Commands::Attach {
3352 name: "dead-sess".into(),
3353 }),
3354 path: None,
3355 };
3356 let result = execute(&cli).await;
3357 let err = result.unwrap_err().to_string();
3358 assert!(err.contains("stopped"));
3359 assert!(err.contains("cannot attach"));
3360 }
3361
3362 #[test]
3365 fn test_cli_parse_alias_spawn() {
3366 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3367 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3368 }
3369
3370 #[test]
3371 fn test_cli_parse_alias_list() {
3372 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3373 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3374 }
3375
3376 #[test]
3377 fn test_cli_parse_list_all() {
3378 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3379 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3380
3381 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3382 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3383 }
3384
3385 #[test]
3386 fn test_cli_parse_alias_logs() {
3387 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3388 assert!(matches!(
3389 &cli.command,
3390 Some(Commands::Logs { name, .. }) if name == "my-session"
3391 ));
3392 }
3393
3394 #[test]
3395 fn test_cli_parse_alias_stop() {
3396 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3397 assert!(matches!(
3398 &cli.command,
3399 Some(Commands::Stop { names, purge }) if names == &["my-session"] && !purge
3400 ));
3401 }
3402
3403 #[test]
3404 fn test_cli_parse_alias_resume() {
3405 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3406 assert!(matches!(
3407 &cli.command,
3408 Some(Commands::Resume { name }) if name == "my-session"
3409 ));
3410 }
3411
3412 #[test]
3413 fn test_cli_parse_alias_nodes() {
3414 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3415 assert!(matches!(&cli.command, Some(Commands::Nodes)));
3416 }
3417
3418 #[test]
3419 fn test_cli_parse_alias_interventions() {
3420 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3421 assert!(matches!(
3422 &cli.command,
3423 Some(Commands::Interventions { name }) if name == "my-session"
3424 ));
3425 }
3426
3427 #[test]
3428 fn test_api_error_json() {
3429 let err = api_error("{\"error\":\"session not found: foo\"}");
3430 assert_eq!(err.to_string(), "session not found: foo");
3431 }
3432
3433 #[test]
3434 fn test_api_error_plain_text() {
3435 let err = api_error("plain text error");
3436 assert_eq!(err.to_string(), "plain text error");
3437 }
3438
3439 #[test]
3442 fn test_diff_output_empty_prev() {
3443 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3444 }
3445
3446 #[test]
3447 fn test_diff_output_identical() {
3448 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3449 }
3450
3451 #[test]
3452 fn test_diff_output_new_lines_appended() {
3453 let prev = "line1\nline2";
3454 let new = "line1\nline2\nline3\nline4";
3455 assert_eq!(diff_output(prev, new), "line3\nline4");
3456 }
3457
3458 #[test]
3459 fn test_diff_output_scrolled_window() {
3460 let prev = "line1\nline2\nline3";
3462 let new = "line2\nline3\nline4";
3463 assert_eq!(diff_output(prev, new), "line4");
3464 }
3465
3466 #[test]
3467 fn test_diff_output_completely_different() {
3468 let prev = "aaa\nbbb";
3469 let new = "xxx\nyyy";
3470 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3471 }
3472
3473 #[test]
3474 fn test_diff_output_last_line_matches_but_overlap_fails() {
3475 let prev = "aaa\ncommon";
3477 let new = "zzz\ncommon\nnew_line";
3478 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3482 }
3483
3484 #[test]
3485 fn test_diff_output_new_empty() {
3486 assert_eq!(diff_output("line1", ""), "");
3487 }
3488
3489 async fn start_follow_test_server() -> String {
3494 use axum::{Router, extract::Path, extract::Query, routing::get};
3495 use std::sync::Arc;
3496 use std::sync::atomic::{AtomicUsize, Ordering};
3497
3498 let call_count = Arc::new(AtomicUsize::new(0));
3499 let output_count = call_count.clone();
3500
3501 let app = Router::new()
3502 .route(
3503 "/api/v1/sessions/{id}/output",
3504 get(
3505 move |_path: Path<String>,
3506 _query: Query<std::collections::HashMap<String, String>>| {
3507 let count = output_count.clone();
3508 async move {
3509 let n = count.fetch_add(1, Ordering::SeqCst);
3510 let output = match n {
3511 0 => "line1\nline2".to_owned(),
3512 1 => "line1\nline2\nline3".to_owned(),
3513 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3514 };
3515 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3516 }
3517 },
3518 ),
3519 )
3520 .route(
3521 "/api/v1/sessions/{id}",
3522 get(|_path: Path<String>| async {
3523 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3524 }),
3525 );
3526
3527 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3528 let addr = listener.local_addr().unwrap();
3529 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3530 format!("http://127.0.0.1:{}", addr.port())
3531 }
3532
3533 #[tokio::test]
3534 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3535 let base = start_follow_test_server().await;
3536 let client = reqwest::Client::new();
3537 let mut buf = Vec::new();
3538
3539 follow_logs(&client, &base, "test", 100, None, &mut buf)
3540 .await
3541 .unwrap();
3542
3543 let output = String::from_utf8(buf).unwrap();
3544 assert!(output.contains("line1"));
3546 assert!(output.contains("line2"));
3547 assert!(output.contains("line3"));
3548 assert!(output.contains("line4"));
3549 assert!(output.contains("[pulpo] Agent exited"));
3550 }
3551
3552 #[tokio::test]
3553 async fn test_execute_logs_follow_success() {
3554 let base = start_follow_test_server().await;
3555 let node = base.strip_prefix("http://").unwrap().to_owned();
3557
3558 let cli = Cli {
3559 node,
3560 token: None,
3561 command: Some(Commands::Logs {
3562 name: "test".into(),
3563 lines: 100,
3564 follow: true,
3565 }),
3566 path: None,
3567 };
3568 let result = execute(&cli).await.unwrap();
3570 assert_eq!(result, "");
3571 }
3572
3573 #[tokio::test]
3574 async fn test_execute_logs_follow_connection_refused() {
3575 let cli = Cli {
3576 node: "localhost:1".into(),
3577 token: None,
3578 command: Some(Commands::Logs {
3579 name: "test".into(),
3580 lines: 50,
3581 follow: true,
3582 }),
3583 path: None,
3584 };
3585 let result = execute(&cli).await;
3586 let err = result.unwrap_err().to_string();
3587 assert!(
3588 err.contains("Could not connect to pulpod"),
3589 "Expected friendly error, got: {err}"
3590 );
3591 }
3592
3593 #[tokio::test]
3594 async fn test_follow_logs_exits_on_dead() {
3595 use axum::{Router, extract::Path, extract::Query, routing::get};
3596
3597 let app = Router::new()
3598 .route(
3599 "/api/v1/sessions/{id}/output",
3600 get(
3601 |_path: Path<String>,
3602 _query: Query<std::collections::HashMap<String, String>>| async {
3603 r#"{"output":"some output"}"#.to_owned()
3604 },
3605 ),
3606 )
3607 .route(
3608 "/api/v1/sessions/{id}",
3609 get(|_path: Path<String>| async {
3610 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"stopped","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3611 }),
3612 );
3613
3614 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3615 let addr = listener.local_addr().unwrap();
3616 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3617 let base = format!("http://127.0.0.1:{}", addr.port());
3618
3619 let client = reqwest::Client::new();
3620 let mut buf = Vec::new();
3621 follow_logs(&client, &base, "test", 100, None, &mut buf)
3622 .await
3623 .unwrap();
3624
3625 let output = String::from_utf8(buf).unwrap();
3626 assert!(output.contains("some output"));
3627 }
3628
3629 #[tokio::test]
3630 async fn test_follow_logs_exits_on_stale() {
3631 use axum::{Router, extract::Path, extract::Query, routing::get};
3632
3633 let app = Router::new()
3634 .route(
3635 "/api/v1/sessions/{id}/output",
3636 get(
3637 |_path: Path<String>,
3638 _query: Query<std::collections::HashMap<String, String>>| async {
3639 r#"{"output":"stale output"}"#.to_owned()
3640 },
3641 ),
3642 )
3643 .route(
3644 "/api/v1/sessions/{id}",
3645 get(|_path: Path<String>| async {
3646 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3647 }),
3648 );
3649
3650 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3651 let addr = listener.local_addr().unwrap();
3652 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3653 let base = format!("http://127.0.0.1:{}", addr.port());
3654
3655 let client = reqwest::Client::new();
3656 let mut buf = Vec::new();
3657 follow_logs(&client, &base, "test", 100, None, &mut buf)
3658 .await
3659 .unwrap();
3660
3661 let output = String::from_utf8(buf).unwrap();
3662 assert!(output.contains("stale output"));
3663 }
3664
3665 #[tokio::test]
3666 async fn test_execute_logs_follow_non_reqwest_error() {
3667 use axum::{Router, extract::Path, extract::Query, routing::get};
3668
3669 let app = Router::new()
3671 .route(
3672 "/api/v1/sessions/{id}/output",
3673 get(
3674 |_path: Path<String>,
3675 _query: Query<std::collections::HashMap<String, String>>| async {
3676 r#"{"output":"initial"}"#.to_owned()
3677 },
3678 ),
3679 )
3680 .route(
3681 "/api/v1/sessions/{id}",
3682 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3683 );
3684
3685 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3686 let addr = listener.local_addr().unwrap();
3687 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3688 let node = format!("127.0.0.1:{}", addr.port());
3689
3690 let cli = Cli {
3691 node,
3692 token: None,
3693 command: Some(Commands::Logs {
3694 name: "test".into(),
3695 lines: 100,
3696 follow: true,
3697 }),
3698 path: None,
3699 };
3700 let err = execute(&cli).await.unwrap_err();
3701 let msg = err.to_string();
3703 assert!(
3704 msg.contains("expected ident"),
3705 "Expected serde parse error, got: {msg}"
3706 );
3707 }
3708
3709 #[tokio::test]
3710 async fn test_fetch_session_status_connection_error() {
3711 let client = reqwest::Client::new();
3712 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3713 assert!(result.is_err());
3714 }
3715
3716 #[test]
3719 fn test_format_schedules_empty() {
3720 assert_eq!(format_schedules(&[]), "No schedules.");
3721 }
3722
3723 #[test]
3724 fn test_format_schedules_with_entries() {
3725 let schedules = vec![serde_json::json!({
3726 "name": "nightly",
3727 "cron": "0 3 * * *",
3728 "enabled": true,
3729 "last_run_at": null,
3730 "target_node": null
3731 })];
3732 let output = format_schedules(&schedules);
3733 assert!(output.contains("nightly"));
3734 assert!(output.contains("0 3 * * *"));
3735 assert!(output.contains("local"));
3736 assert!(output.contains("yes"));
3737 assert!(output.contains('-'));
3738 }
3739
3740 #[test]
3741 fn test_format_schedules_disabled_entry() {
3742 let schedules = vec![serde_json::json!({
3743 "name": "weekly",
3744 "cron": "0 0 * * 0",
3745 "enabled": false,
3746 "last_run_at": "2026-03-18T03:00:00Z",
3747 "target_node": "gpu-box"
3748 })];
3749 let output = format_schedules(&schedules);
3750 assert!(output.contains("weekly"));
3751 assert!(output.contains("no"));
3752 assert!(output.contains("gpu-box"));
3753 assert!(output.contains("2026-03-18T03:00"));
3754 }
3755
3756 #[test]
3757 fn test_format_schedules_header() {
3758 let schedules = vec![serde_json::json!({
3759 "name": "test",
3760 "cron": "* * * * *",
3761 "enabled": true,
3762 "last_run_at": null,
3763 "target_node": null
3764 })];
3765 let output = format_schedules(&schedules);
3766 assert!(output.contains("NAME"));
3767 assert!(output.contains("CRON"));
3768 assert!(output.contains("ENABLED"));
3769 assert!(output.contains("LAST RUN"));
3770 assert!(output.contains("NODE"));
3771 }
3772
3773 #[test]
3776 fn test_cli_parse_schedule_add() {
3777 let cli = Cli::try_parse_from([
3778 "pulpo",
3779 "schedule",
3780 "add",
3781 "nightly",
3782 "0 3 * * *",
3783 "--workdir",
3784 "/repo",
3785 "--",
3786 "claude",
3787 "-p",
3788 "review",
3789 ])
3790 .unwrap();
3791 assert!(matches!(
3792 &cli.command,
3793 Some(Commands::Schedule {
3794 action: ScheduleAction::Add { name, cron, .. }
3795 }) if name == "nightly" && cron == "0 3 * * *"
3796 ));
3797 }
3798
3799 #[test]
3800 fn test_cli_parse_schedule_add_with_node() {
3801 let cli = Cli::try_parse_from([
3802 "pulpo",
3803 "schedule",
3804 "add",
3805 "nightly",
3806 "0 3 * * *",
3807 "--workdir",
3808 "/repo",
3809 "--node",
3810 "gpu-box",
3811 "--",
3812 "claude",
3813 ])
3814 .unwrap();
3815 assert!(matches!(
3816 &cli.command,
3817 Some(Commands::Schedule {
3818 action: ScheduleAction::Add { node, .. }
3819 }) if node.as_deref() == Some("gpu-box")
3820 ));
3821 }
3822
3823 #[test]
3824 fn test_cli_parse_schedule_add_install_alias() {
3825 let cli =
3826 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3827 assert!(matches!(
3828 &cli.command,
3829 Some(Commands::Schedule {
3830 action: ScheduleAction::Add { name, .. }
3831 }) if name == "nightly"
3832 ));
3833 }
3834
3835 #[test]
3836 fn test_cli_parse_schedule_list() {
3837 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3838 assert!(matches!(
3839 &cli.command,
3840 Some(Commands::Schedule {
3841 action: ScheduleAction::List
3842 })
3843 ));
3844 }
3845
3846 #[test]
3847 fn test_cli_parse_schedule_remove() {
3848 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3849 assert!(matches!(
3850 &cli.command,
3851 Some(Commands::Schedule {
3852 action: ScheduleAction::Remove { name }
3853 }) if name == "nightly"
3854 ));
3855 }
3856
3857 #[test]
3858 fn test_cli_parse_schedule_pause() {
3859 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3860 assert!(matches!(
3861 &cli.command,
3862 Some(Commands::Schedule {
3863 action: ScheduleAction::Pause { name }
3864 }) if name == "nightly"
3865 ));
3866 }
3867
3868 #[test]
3869 fn test_cli_parse_schedule_resume() {
3870 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3871 assert!(matches!(
3872 &cli.command,
3873 Some(Commands::Schedule {
3874 action: ScheduleAction::Resume { name }
3875 }) if name == "nightly"
3876 ));
3877 }
3878
3879 #[test]
3880 fn test_cli_parse_schedule_alias() {
3881 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3882 assert!(matches!(
3883 &cli.command,
3884 Some(Commands::Schedule {
3885 action: ScheduleAction::List
3886 })
3887 ));
3888 }
3889
3890 #[test]
3891 fn test_cli_parse_schedule_list_alias() {
3892 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3893 assert!(matches!(
3894 &cli.command,
3895 Some(Commands::Schedule {
3896 action: ScheduleAction::List
3897 })
3898 ));
3899 }
3900
3901 #[test]
3902 fn test_cli_parse_schedule_remove_alias() {
3903 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3904 assert!(matches!(
3905 &cli.command,
3906 Some(Commands::Schedule {
3907 action: ScheduleAction::Remove { name }
3908 }) if name == "nightly"
3909 ));
3910 }
3911
3912 #[tokio::test]
3913 async fn test_execute_schedule_list_via_execute() {
3914 let node = start_test_server().await;
3915 let cli = Cli {
3916 node,
3917 token: None,
3918 command: Some(Commands::Schedule {
3919 action: ScheduleAction::List,
3920 }),
3921 path: None,
3922 };
3923 let result = execute(&cli).await.unwrap();
3924 #[cfg(coverage)]
3926 assert!(result.is_empty());
3927 #[cfg(not(coverage))]
3928 assert_eq!(result, "No schedules.");
3929 }
3930
3931 #[test]
3932 fn test_schedule_action_debug() {
3933 let action = ScheduleAction::List;
3934 assert_eq!(format!("{action:?}"), "List");
3935 }
3936
3937 #[test]
3938 fn test_cli_parse_send_alias() {
3939 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3940 assert!(matches!(
3941 &cli.command,
3942 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3943 ));
3944 }
3945
3946 #[test]
3947 fn test_cli_parse_spawn_no_name() {
3948 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3949 assert!(matches!(
3950 &cli.command,
3951 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3952 ));
3953 }
3954
3955 #[test]
3956 fn test_cli_parse_spawn_optional_name_with_command() {
3957 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3958 assert!(matches!(
3959 &cli.command,
3960 Some(Commands::Spawn { name, command, .. })
3961 if name.is_none() && command == &["echo", "hello"]
3962 ));
3963 }
3964
3965 #[test]
3966 fn test_cli_parse_path_shortcut() {
3967 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3968 assert!(cli.command.is_none());
3969 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3970 }
3971
3972 #[test]
3973 fn test_cli_parse_no_args() {
3974 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3975 assert!(cli.command.is_none());
3976 assert!(cli.path.is_none());
3977 }
3978
3979 #[test]
3980 fn test_derive_session_name_simple() {
3981 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3982 }
3983
3984 #[test]
3985 fn test_derive_session_name_with_special_chars() {
3986 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3987 }
3988
3989 #[test]
3990 fn test_derive_session_name_root() {
3991 assert_eq!(derive_session_name("/"), "session");
3992 }
3993
3994 #[test]
3995 fn test_derive_session_name_dots() {
3996 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3997 }
3998
3999 #[test]
4000 fn test_resolve_path_absolute() {
4001 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
4002 }
4003
4004 #[test]
4005 fn test_resolve_path_relative() {
4006 let resolved = resolve_path("my-repo");
4007 assert!(resolved.ends_with("my-repo"));
4008 assert!(resolved.starts_with('/'));
4009 }
4010
4011 #[tokio::test]
4012 async fn test_execute_no_args_shows_help() {
4013 let node = start_test_server().await;
4014 let cli = Cli {
4015 node,
4016 token: None,
4017 path: None,
4018 command: None,
4019 };
4020 let result = execute(&cli).await.unwrap();
4021 assert!(
4022 result.is_empty(),
4023 "no-args should return empty string after printing help"
4024 );
4025 }
4026
4027 #[tokio::test]
4028 async fn test_execute_path_shortcut() {
4029 let node = start_test_server().await;
4030 let cli = Cli {
4031 node,
4032 token: None,
4033 path: Some("/tmp".into()),
4034 command: None,
4035 };
4036 let result = execute(&cli).await.unwrap();
4037 assert!(result.contains("Detached from session"));
4038 }
4039
4040 #[tokio::test]
4041 async fn test_deduplicate_session_name_no_conflict() {
4042 let base = "http://127.0.0.1:1";
4044 let client = reqwest::Client::new();
4045 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4046 assert_eq!(name, "fresh");
4047 }
4048
4049 #[tokio::test]
4050 async fn test_deduplicate_session_name_with_conflict() {
4051 use axum::{Router, routing::get};
4052 use std::sync::atomic::{AtomicU32, Ordering};
4053
4054 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4055 let counter = call_count.clone();
4056 let app = Router::new()
4057 .route(
4058 "/api/v1/sessions/{id}",
4059 get(move || {
4060 let c = counter.clone();
4061 async move {
4062 let n = c.fetch_add(1, Ordering::SeqCst);
4063 if n == 0 {
4064 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4066 } else {
4067 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4069 }
4070 }
4071 }),
4072 )
4073 .route(
4074 "/api/v1/peers",
4075 get(|| async {
4076 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4077 }),
4078 );
4079 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4080 let addr = listener.local_addr().unwrap();
4081 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4082 let base = format!("http://127.0.0.1:{}", addr.port());
4083 let client = reqwest::Client::new();
4084 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4085 assert_eq!(name, "repo-2");
4086 }
4087
4088 #[test]
4091 fn test_node_needs_resolution() {
4092 assert!(!node_needs_resolution("localhost:7433"));
4093 assert!(!node_needs_resolution("mac-mini:7433"));
4094 assert!(!node_needs_resolution("10.0.0.1:7433"));
4095 assert!(!node_needs_resolution("[::1]:7433"));
4096 assert!(node_needs_resolution("mac-mini"));
4097 assert!(node_needs_resolution("linux-server"));
4098 assert!(node_needs_resolution("localhost"));
4099 }
4100
4101 #[tokio::test]
4102 async fn test_resolve_node_with_port() {
4103 let client = reqwest::Client::new();
4104 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4105 assert_eq!(addr, "mac-mini:7433");
4106 assert!(token.is_none());
4107 }
4108
4109 #[tokio::test]
4110 async fn test_resolve_node_fallback_appends_port() {
4111 let client = reqwest::Client::new();
4114 let (addr, token) = resolve_node(&client, "unknown-host").await;
4115 assert_eq!(addr, "unknown-host:7433");
4116 assert!(token.is_none());
4117 }
4118
4119 #[cfg(not(coverage))]
4120 #[tokio::test]
4121 async fn test_resolve_node_finds_peer() {
4122 use axum::{Router, routing::get};
4123
4124 let app = Router::new()
4125 .route(
4126 "/api/v1/peers",
4127 get(|| async {
4128 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4129 }),
4130 )
4131 .route(
4132 "/api/v1/config",
4133 get(|| async {
4134 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4135 }),
4136 );
4137
4138 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4140 return;
4141 };
4142 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4143
4144 let client = reqwest::Client::new();
4145 let (addr, token) = resolve_node(&client, "mac-mini").await;
4146 assert_eq!(addr, "10.0.0.5:7433");
4147 assert_eq!(token, Some("peer-secret".into()));
4148 }
4149
4150 #[cfg(not(coverage))]
4151 #[tokio::test]
4152 async fn test_resolve_node_peer_no_token() {
4153 use axum::{Router, routing::get};
4154
4155 let app = Router::new()
4156 .route(
4157 "/api/v1/peers",
4158 get(|| async {
4159 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4160 }),
4161 )
4162 .route(
4163 "/api/v1/config",
4164 get(|| async {
4165 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4166 }),
4167 );
4168
4169 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4170 return; };
4172 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4173
4174 let client = reqwest::Client::new();
4175 let (addr, token) = resolve_node(&client, "test-peer").await;
4176 assert_eq!(addr, "10.0.0.9:7433");
4177 assert!(token.is_none()); }
4179
4180 #[tokio::test]
4181 async fn test_execute_with_peer_name_resolution() {
4182 let cli = Cli {
4186 node: "nonexistent-peer".into(),
4187 token: None,
4188 command: Some(Commands::List { all: false }),
4189 path: None,
4190 };
4191 let result = execute(&cli).await;
4192 assert!(result.is_err());
4194 }
4195
4196 #[test]
4199 fn test_cli_parse_spawn_auto() {
4200 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4201 assert!(matches!(
4202 &cli.command,
4203 Some(Commands::Spawn { auto, .. }) if *auto
4204 ));
4205 }
4206
4207 #[test]
4208 fn test_cli_parse_spawn_auto_default() {
4209 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4210 assert!(matches!(
4211 &cli.command,
4212 Some(Commands::Spawn { auto, .. }) if !auto
4213 ));
4214 }
4215
4216 #[tokio::test]
4217 async fn test_select_best_node_coverage_stub() {
4218 let client = reqwest::Client::new();
4220 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4223 }
4224
4225 #[cfg(not(coverage))]
4226 #[tokio::test]
4227 async fn test_select_best_node_picks_least_loaded() {
4228 use axum::{Router, routing::get};
4229
4230 let app = Router::new().route(
4231 "/api/v1/peers",
4232 get(|| async {
4233 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4234 }),
4235 );
4236 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4237 let addr = listener.local_addr().unwrap();
4238 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4239 let base = format!("http://127.0.0.1:{}", addr.port());
4240
4241 let client = reqwest::Client::new();
4242 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4243 assert_eq!(name, "idle");
4247 assert_eq!(addr, "idle:7433");
4248 }
4249
4250 #[cfg(not(coverage))]
4251 #[tokio::test]
4252 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4253 use axum::{Router, routing::get};
4254
4255 let app = Router::new().route(
4256 "/api/v1/peers",
4257 get(|| async {
4258 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4259 }),
4260 );
4261 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4262 let addr = listener.local_addr().unwrap();
4263 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4264 let base = format!("http://127.0.0.1:{}", addr.port());
4265
4266 let client = reqwest::Client::new();
4267 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4268 assert_eq!(name, "my-mac");
4269 assert_eq!(addr, "localhost:7433");
4270 }
4271
4272 #[cfg(not(coverage))]
4273 #[tokio::test]
4274 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4275 use axum::{Router, routing::get};
4276
4277 let app = Router::new().route(
4278 "/api/v1/peers",
4279 get(|| async {
4280 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4281 }),
4282 );
4283 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4284 let addr = listener.local_addr().unwrap();
4285 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4286 let base = format!("http://127.0.0.1:{}", addr.port());
4287
4288 let client = reqwest::Client::new();
4289 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4290 assert_eq!(name, "solo");
4291 assert_eq!(addr, "localhost:7433");
4292 }
4293
4294 #[cfg(not(coverage))]
4295 #[tokio::test]
4296 async fn test_execute_spawn_auto_selects_node() {
4297 use axum::{
4298 Router,
4299 http::StatusCode,
4300 routing::{get, post},
4301 };
4302
4303 let create_json = test_create_response_json();
4304
4305 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4307 let addr = listener.local_addr().unwrap();
4308 let node = format!("127.0.0.1:{}", addr.port());
4309 let peer_addr = node.clone();
4310
4311 let app = Router::new()
4312 .route(
4313 "/api/v1/peers",
4314 get(move || {
4315 let peer_addr = peer_addr.clone();
4316 async move {
4317 format!(
4318 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4319 )
4320 }
4321 }),
4322 )
4323 .route(
4324 "/api/v1/sessions",
4325 post(move || async move {
4326 (StatusCode::CREATED, create_json.clone())
4327 }),
4328 );
4329
4330 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4331
4332 let cli = Cli {
4333 node,
4334 token: None,
4335 command: Some(Commands::Spawn {
4336 name: Some("test".into()),
4337 workdir: Some("/tmp/repo".into()),
4338 ink: None,
4339 description: None,
4340 detach: true,
4341 idle_threshold: None,
4342 auto: true,
4343 worktree: false,
4344 runtime: None,
4345 secret: vec![],
4346 command: vec!["echo".into(), "hello".into()],
4347 }),
4348 path: None,
4349 };
4350 let result = execute(&cli).await.unwrap();
4351 assert!(result.contains("Created session"));
4352 }
4353
4354 #[cfg(not(coverage))]
4355 #[tokio::test]
4356 async fn test_select_best_node_peer_no_session_count() {
4357 use axum::{Router, routing::get};
4358
4359 let app = Router::new().route(
4360 "/api/v1/peers",
4361 get(|| async {
4362 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4363 }),
4364 );
4365 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4366 let addr = listener.local_addr().unwrap();
4367 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4368 let base = format!("http://127.0.0.1:{}", addr.port());
4369
4370 let client = reqwest::Client::new();
4371 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4372 assert_eq!(name, "fresh");
4374 assert_eq!(addr, "fresh:7433");
4375 }
4376
4377 #[test]
4380 fn test_cli_parse_secret_set() {
4381 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4382 assert!(matches!(
4383 &cli.command,
4384 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4385 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4386 ));
4387 }
4388
4389 #[test]
4390 fn test_cli_parse_secret_list() {
4391 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4392 assert!(matches!(
4393 &cli.command,
4394 Some(Commands::Secret {
4395 action: SecretAction::List
4396 })
4397 ));
4398 }
4399
4400 #[test]
4401 fn test_cli_parse_secret_list_alias() {
4402 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4403 assert!(matches!(
4404 &cli.command,
4405 Some(Commands::Secret {
4406 action: SecretAction::List
4407 })
4408 ));
4409 }
4410
4411 #[test]
4412 fn test_cli_parse_secret_delete() {
4413 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4414 assert!(matches!(
4415 &cli.command,
4416 Some(Commands::Secret { action: SecretAction::Delete { name } })
4417 if name == "MY_TOKEN"
4418 ));
4419 }
4420
4421 #[test]
4422 fn test_cli_parse_secret_delete_alias() {
4423 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4424 assert!(matches!(
4425 &cli.command,
4426 Some(Commands::Secret { action: SecretAction::Delete { name } })
4427 if name == "MY_TOKEN"
4428 ));
4429 }
4430
4431 #[test]
4432 fn test_cli_parse_secret_alias() {
4433 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4434 assert!(matches!(
4435 &cli.command,
4436 Some(Commands::Secret {
4437 action: SecretAction::List
4438 })
4439 ));
4440 }
4441
4442 #[test]
4443 fn test_format_secrets_empty() {
4444 let secrets: Vec<serde_json::Value> = vec![];
4445 assert_eq!(format_secrets(&secrets), "No secrets configured.");
4446 }
4447
4448 #[test]
4449 fn test_format_secrets_with_entries() {
4450 let secrets = vec![
4451 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4452 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4453 ];
4454 let output = format_secrets(&secrets);
4455 assert!(output.contains("GITHUB_TOKEN"));
4456 assert!(output.contains("NPM_TOKEN"));
4457 assert!(output.contains("NAME"));
4458 assert!(output.contains("ENV"));
4459 assert!(output.contains("CREATED"));
4460 }
4461
4462 #[test]
4463 fn test_format_secrets_with_env() {
4464 let secrets = vec![
4465 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4466 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4467 ];
4468 let output = format_secrets(&secrets);
4469 assert!(output.contains("GH_WORK"));
4470 assert!(output.contains("GITHUB_TOKEN"));
4471 assert!(output.contains("NPM_TOKEN"));
4472 }
4473
4474 #[test]
4475 fn test_format_secrets_short_timestamp() {
4476 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4477 let output = format_secrets(&secrets);
4478 assert!(output.contains("now"));
4479 }
4480
4481 #[test]
4482 fn test_format_schedules_short_last_run_at() {
4483 let schedules = vec![serde_json::json!({
4485 "name": "test",
4486 "cron": "* * * * *",
4487 "enabled": true,
4488 "last_run_at": "short",
4489 "target_node": null
4490 })];
4491 let output = format_schedules(&schedules);
4492 assert!(output.contains("short"));
4493 }
4494
4495 #[test]
4496 fn test_format_sessions_multibyte_command_truncation() {
4497 use chrono::Utc;
4498 use pulpo_common::session::SessionStatus;
4499 use uuid::Uuid;
4500
4501 let sessions = vec![Session {
4503 id: Uuid::nil(),
4504 name: "test".into(),
4505 workdir: "/tmp".into(),
4506 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4507 description: None,
4508 status: SessionStatus::Active,
4509 exit_code: None,
4510 backend_session_id: None,
4511 output_snapshot: None,
4512 metadata: None,
4513 ink: None,
4514 intervention_code: None,
4515 intervention_reason: None,
4516 intervention_at: None,
4517 last_output_at: None,
4518 idle_since: None,
4519 idle_threshold_secs: None,
4520 worktree_path: None,
4521 runtime: Runtime::Tmux,
4522 created_at: Utc::now(),
4523 updated_at: Utc::now(),
4524 }];
4525 let output = format_sessions(&sessions);
4526 assert!(output.contains("..."));
4527 }
4528}