1use anyhow::Result;
2use clap::{Parser, Subcommand};
3#[cfg_attr(coverage, allow(unused_imports))]
4use pulpo_common::api::{
5 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
6 PeersResponse,
7};
8use pulpo_common::session::{Runtime, Session};
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version = env!("PULPO_VERSION"),
15 args_conflicts_with_subcommands = true
16)]
17pub struct Cli {
18 #[arg(long, default_value = "localhost:7433")]
20 pub node: String,
21
22 #[arg(long)]
24 pub token: Option<String>,
25
26 #[command(subcommand)]
27 pub command: Option<Commands>,
28
29 #[arg(value_name = "PATH")]
31 pub path: Option<String>,
32}
33
34#[derive(Subcommand, Debug)]
35#[allow(clippy::large_enum_variant)]
36pub enum Commands {
37 #[command(visible_alias = "a")]
39 Attach {
40 name: String,
42 },
43
44 #[command(visible_alias = "i", visible_alias = "send")]
46 Input {
47 name: String,
49 text: Option<String>,
51 },
52
53 #[command(visible_alias = "s")]
55 Spawn {
56 name: Option<String>,
58
59 #[arg(long)]
61 workdir: Option<String>,
62
63 #[arg(long)]
65 ink: Option<String>,
66
67 #[arg(long)]
69 description: Option<String>,
70
71 #[arg(short, long)]
73 detach: bool,
74
75 #[arg(long)]
77 idle_threshold: Option<u32>,
78
79 #[arg(long)]
81 auto: bool,
82
83 #[arg(long)]
85 worktree: bool,
86
87 #[arg(long)]
89 runtime: Option<String>,
90
91 #[arg(long)]
93 secret: Vec<String>,
94
95 #[arg(last = true)]
97 command: Vec<String>,
98 },
99
100 #[command(visible_alias = "ls")]
102 List {
103 #[arg(short, long)]
105 all: bool,
106 },
107
108 #[command(visible_alias = "l")]
110 Logs {
111 name: String,
113
114 #[arg(long, default_value = "100")]
116 lines: usize,
117
118 #[arg(short, long)]
120 follow: bool,
121 },
122
123 #[command(visible_alias = "k")]
125 Kill {
126 name: String,
128 },
129
130 #[command(visible_alias = "rm")]
132 Delete {
133 name: String,
135 },
136
137 #[command(visible_alias = "r")]
139 Resume {
140 name: String,
142 },
143
144 #[command(visible_alias = "n")]
146 Nodes,
147
148 #[command(visible_alias = "iv")]
150 Interventions {
151 name: String,
153 },
154
155 Ui,
157
158 #[command(visible_alias = "sched")]
160 Schedule {
161 #[command(subcommand)]
162 action: ScheduleAction,
163 },
164
165 #[command(visible_alias = "sec")]
167 Secret {
168 #[command(subcommand)]
169 action: SecretAction,
170 },
171}
172
173#[derive(Subcommand, Debug)]
174pub enum SecretAction {
175 Set {
177 name: String,
179 value: String,
181 #[arg(long)]
183 env: Option<String>,
184 },
185 #[command(visible_alias = "ls")]
187 List,
188 #[command(visible_alias = "rm")]
190 Delete {
191 name: String,
193 },
194}
195
196#[derive(Subcommand, Debug)]
197pub enum ScheduleAction {
198 #[command(alias = "install")]
200 Add {
201 name: String,
203 cron: String,
205 #[arg(long)]
207 workdir: Option<String>,
208 #[arg(long)]
210 node: Option<String>,
211 #[arg(long)]
213 ink: Option<String>,
214 #[arg(long)]
216 description: Option<String>,
217 #[arg(last = true)]
219 command: Vec<String>,
220 },
221 #[command(alias = "ls")]
223 List,
224 #[command(alias = "rm")]
226 Remove {
227 name: String,
229 },
230 Pause {
232 name: String,
234 },
235 Resume {
237 name: String,
239 },
240}
241
242const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
244
245fn resolve_path(path: &str) -> String {
247 let p = std::path::Path::new(path);
248 if p.is_absolute() {
249 path.to_owned()
250 } else {
251 std::env::current_dir().map_or_else(
252 |_| path.to_owned(),
253 |cwd| cwd.join(p).to_string_lossy().into_owned(),
254 )
255 }
256}
257
258fn derive_session_name(path: &str) -> String {
260 let basename = std::path::Path::new(path)
261 .file_name()
262 .and_then(|n| n.to_str())
263 .unwrap_or("session");
264 let kebab: String = basename
266 .chars()
267 .map(|c| {
268 if c.is_ascii_alphanumeric() {
269 c.to_ascii_lowercase()
270 } else {
271 '-'
272 }
273 })
274 .collect();
275 let mut result = String::new();
277 for c in kebab.chars() {
278 if c == '-' && result.ends_with('-') {
279 continue;
280 }
281 result.push(c);
282 }
283 let result = result.trim_matches('-').to_owned();
284 if result.is_empty() {
285 "session".to_owned()
286 } else {
287 result
288 }
289}
290
291async fn deduplicate_session_name(
293 client: &reqwest::Client,
294 base: &str,
295 name: &str,
296 token: Option<&str>,
297) -> String {
298 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
300 .send()
301 .await;
302 match resp {
303 Ok(r) if r.status().is_success() => {
304 for i in 2..=99 {
306 let candidate = format!("{name}-{i}");
307 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
308 .send()
309 .await;
310 match resp {
311 Ok(r) if r.status().is_success() => {}
312 _ => return candidate,
313 }
314 }
315 format!("{name}-100")
316 }
317 _ => name.to_owned(),
318 }
319}
320
321pub fn base_url(node: &str) -> String {
323 format!("http://{node}")
324}
325
326#[derive(serde::Deserialize)]
328struct OutputResponse {
329 output: String,
330}
331
332const fn session_runtime(session: &Session) -> &'static str {
334 match session.runtime {
335 Runtime::Tmux => "tmux",
336 Runtime::Docker => "docker",
337 }
338}
339
340fn format_sessions(sessions: &[Session]) -> String {
341 if sessions.is_empty() {
342 return "No sessions.".into();
343 }
344 let mut lines = vec![format!(
345 "{:<10} {:<24} {:<12} {:<8} {}",
346 "ID", "NAME", "STATUS", "RUNTIME", "COMMAND"
347 )];
348 for s in sessions {
349 let cmd_display = if s.command.len() > 50 {
350 let truncated: String = s.command.chars().take(47).collect();
351 format!("{truncated}...")
352 } else {
353 s.command.clone()
354 };
355 let short_id = &s.id.to_string()[..8];
356 let has_pr = s
357 .metadata
358 .as_ref()
359 .is_some_and(|m| m.contains_key("pr_url"));
360 let name_display = match (s.worktree_path.is_some(), has_pr) {
361 (true, true) => format!("{} [wt] [PR]", s.name),
362 (true, false) => format!("{} [wt]", s.name),
363 (false, true) => format!("{} [PR]", s.name),
364 (false, false) => s.name.clone(),
365 };
366 lines.push(format!(
367 "{:<10} {:<24} {:<12} {:<8} {}",
368 short_id,
369 name_display,
370 s.status,
371 session_runtime(s),
372 cmd_display
373 ));
374 }
375 lines.join("\n")
376}
377
378fn format_nodes(resp: &PeersResponse) -> String {
380 let mut lines = vec![format!(
381 "{:<20} {:<25} {:<10} {}",
382 "NAME", "ADDRESS", "STATUS", "SESSIONS"
383 )];
384 lines.push(format!(
385 "{:<20} {:<25} {:<10} {}",
386 resp.local.name, "(local)", "online", "-"
387 ));
388 for p in &resp.peers {
389 let sessions = p
390 .session_count
391 .map_or_else(|| "-".into(), |c| c.to_string());
392 lines.push(format!(
393 "{:<20} {:<25} {:<10} {}",
394 p.name, p.address, p.status, sessions
395 ));
396 }
397 lines.join("\n")
398}
399
400fn format_interventions(events: &[InterventionEventResponse]) -> String {
402 if events.is_empty() {
403 return "No intervention events.".into();
404 }
405 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
406 for e in events {
407 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
408 }
409 lines.join("\n")
410}
411
412#[cfg_attr(coverage, allow(dead_code))]
414fn build_open_command(url: &str) -> std::process::Command {
415 #[cfg(target_os = "macos")]
416 {
417 let mut cmd = std::process::Command::new("open");
418 cmd.arg(url);
419 cmd
420 }
421 #[cfg(target_os = "linux")]
422 {
423 let mut cmd = std::process::Command::new("xdg-open");
424 cmd.arg(url);
425 cmd
426 }
427 #[cfg(target_os = "windows")]
428 {
429 let mut cmd = std::process::Command::new("cmd");
430 cmd.args(["/C", "start", url]);
431 cmd
432 }
433 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
434 {
435 let mut cmd = std::process::Command::new("xdg-open");
437 cmd.arg(url);
438 cmd
439 }
440}
441
442#[cfg(not(coverage))]
444fn open_browser(url: &str) -> Result<()> {
445 build_open_command(url).status()?;
446 Ok(())
447}
448
449#[cfg(coverage)]
451fn open_browser(_url: &str) -> Result<()> {
452 Ok(())
453}
454
455#[cfg_attr(coverage, allow(dead_code))]
458fn build_attach_command(backend_session_id: &str) -> std::process::Command {
459 if let Some(container) = backend_session_id.strip_prefix("docker:") {
461 let mut cmd = std::process::Command::new("docker");
462 cmd.args(["exec", "-it", container, "/bin/sh"]);
463 return cmd;
464 }
465 #[cfg(not(target_os = "windows"))]
467 {
468 let mut cmd = std::process::Command::new("tmux");
469 cmd.args(["attach-session", "-t", backend_session_id]);
470 cmd
471 }
472 #[cfg(target_os = "windows")]
473 {
474 let mut cmd = std::process::Command::new("cmd");
476 cmd.args([
477 "/C",
478 "echo",
479 "Attach not available on Windows. Use the web UI or --runtime docker.",
480 ]);
481 cmd
482 }
483}
484
485#[cfg(not(any(test, coverage, target_os = "windows")))]
487fn attach_session(backend_session_id: &str) -> Result<()> {
488 let status = build_attach_command(backend_session_id).status()?;
489 if !status.success() {
490 anyhow::bail!("attach failed with {status}");
491 }
492 Ok(())
493}
494
495#[cfg(all(target_os = "windows", not(test), not(coverage)))]
497fn attach_session(_backend_session_id: &str) -> Result<()> {
498 eprintln!("tmux attach is not available on Windows. Use the web UI or --runtime docker.");
499 Ok(())
500}
501
502#[cfg(any(test, coverage))]
504#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
505fn attach_session(_backend_session_id: &str) -> Result<()> {
506 Ok(())
507}
508
509fn api_error(text: &str) -> anyhow::Error {
511 serde_json::from_str::<serde_json::Value>(text)
512 .ok()
513 .and_then(|v| v["error"].as_str().map(String::from))
514 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
515}
516
517async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
519 if resp.status().is_success() {
520 Ok(resp.text().await?)
521 } else {
522 let text = resp.text().await?;
523 Err(api_error(&text))
524 }
525}
526
527fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
529 if err.is_connect() {
530 anyhow::anyhow!(
531 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
532 )
533 } else {
534 anyhow::anyhow!("Network error connecting to {node}: {err}")
535 }
536}
537
538fn is_localhost(node: &str) -> bool {
540 let host = node.split(':').next().unwrap_or(node);
541 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
542}
543
544async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
546 let resp = client
547 .get(format!("{base}/api/v1/auth/token"))
548 .send()
549 .await
550 .ok()?;
551 let body: AuthTokenResponse = resp.json().await.ok()?;
552 if body.token.is_empty() {
553 None
554 } else {
555 Some(body.token)
556 }
557}
558
559async fn resolve_token(
561 client: &reqwest::Client,
562 base: &str,
563 node: &str,
564 explicit: Option<&str>,
565) -> Option<String> {
566 if let Some(t) = explicit {
567 return Some(t.to_owned());
568 }
569 if is_localhost(node) {
570 return discover_token(client, base).await;
571 }
572 None
573}
574
575fn node_needs_resolution(node: &str) -> bool {
577 !node.contains(':')
578}
579
580#[cfg(not(coverage))]
587async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
588 if !node_needs_resolution(node) {
590 return (node.to_owned(), None);
591 }
592
593 let local_base = "http://localhost:7433";
595 let mut resolved_address: Option<String> = None;
596
597 if let Ok(resp) = client
598 .get(format!("{local_base}/api/v1/peers"))
599 .send()
600 .await
601 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
602 {
603 for peer in &peers_resp.peers {
604 if peer.name == node {
605 resolved_address = Some(peer.address.clone());
606 break;
607 }
608 }
609 }
610
611 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
612
613 let peer_token = if let Ok(resp) = client
615 .get(format!("{local_base}/api/v1/config"))
616 .send()
617 .await
618 && let Ok(config) = resp.json::<ConfigResponse>().await
619 && let Some(entry) = config.peers.get(node)
620 {
621 entry.token().map(String::from)
622 } else {
623 None
624 };
625
626 (address, peer_token)
627}
628
629#[cfg(coverage)]
631async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
632 if node_needs_resolution(node) {
633 (format!("{node}:7433"), None)
634 } else {
635 (node.to_owned(), None)
636 }
637}
638
639fn authed_get(
641 client: &reqwest::Client,
642 url: String,
643 token: Option<&str>,
644) -> reqwest::RequestBuilder {
645 let req = client.get(url);
646 if let Some(t) = token {
647 req.bearer_auth(t)
648 } else {
649 req
650 }
651}
652
653fn authed_post(
655 client: &reqwest::Client,
656 url: String,
657 token: Option<&str>,
658) -> reqwest::RequestBuilder {
659 let req = client.post(url);
660 if let Some(t) = token {
661 req.bearer_auth(t)
662 } else {
663 req
664 }
665}
666
667fn authed_delete(
669 client: &reqwest::Client,
670 url: String,
671 token: Option<&str>,
672) -> reqwest::RequestBuilder {
673 let req = client.delete(url);
674 if let Some(t) = token {
675 req.bearer_auth(t)
676 } else {
677 req
678 }
679}
680
681#[cfg(not(coverage))]
683fn authed_put(
684 client: &reqwest::Client,
685 url: String,
686 token: Option<&str>,
687) -> reqwest::RequestBuilder {
688 let req = client.put(url);
689 if let Some(t) = token {
690 req.bearer_auth(t)
691 } else {
692 req
693 }
694}
695
696async fn fetch_output(
698 client: &reqwest::Client,
699 base: &str,
700 name: &str,
701 lines: usize,
702 token: Option<&str>,
703) -> Result<String> {
704 let resp = authed_get(
705 client,
706 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
707 token,
708 )
709 .send()
710 .await?;
711 let text = ok_or_api_error(resp).await?;
712 let output: OutputResponse = serde_json::from_str(&text)?;
713 Ok(output.output)
714}
715
716async fn fetch_session_status(
718 client: &reqwest::Client,
719 base: &str,
720 name: &str,
721 token: Option<&str>,
722) -> Result<String> {
723 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
724 .send()
725 .await?;
726 let text = ok_or_api_error(resp).await?;
727 let session: Session = serde_json::from_str(&text)?;
728 Ok(session.status.to_string())
729}
730
731async fn check_session_alive(
734 client: &reqwest::Client,
735 base: &str,
736 name: &str,
737 token: Option<&str>,
738) -> Result<()> {
739 for _ in 0..3 {
741 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
742 if let Ok(status) = fetch_session_status(client, base, name, token).await {
743 match status.as_str() {
744 "creating" => continue, "lost" | "killed" => {
746 anyhow::bail!(
747 "Session \"{name}\" exited immediately — the command may have failed.\n Check logs: pulpo logs {name}"
748 );
749 }
750 _ => return Ok(()), }
752 }
753 break;
755 }
756 Ok(())
757}
758
759fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
766 if prev.is_empty() {
767 return new;
768 }
769
770 let prev_lines: Vec<&str> = prev.lines().collect();
771 let new_lines: Vec<&str> = new.lines().collect();
772
773 if new_lines.is_empty() {
774 return "";
775 }
776
777 let last_prev = prev_lines[prev_lines.len() - 1];
779
780 for i in (0..new_lines.len()).rev() {
782 if new_lines[i] == last_prev {
783 let overlap_len = prev_lines.len().min(i + 1);
785 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
786 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
787 if prev_tail == new_overlap {
788 if i + 1 < new_lines.len() {
789 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
791 return new.get(consumed.min(new.len())..).unwrap_or("");
792 }
793 return "";
794 }
795 }
796 }
797
798 new
800}
801
802async fn follow_logs(
804 client: &reqwest::Client,
805 base: &str,
806 name: &str,
807 lines: usize,
808 token: Option<&str>,
809 writer: &mut (dyn std::io::Write + Send),
810) -> Result<()> {
811 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
812 write!(writer, "{prev_output}")?;
813
814 let mut unchanged_ticks: u32 = 0;
815
816 loop {
817 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
818
819 let new_output = fetch_output(client, base, name, lines, token).await?;
821
822 let diff = diff_output(&prev_output, &new_output);
823 if diff.is_empty() {
824 unchanged_ticks += 1;
825 } else {
826 write!(writer, "{diff}")?;
827 unchanged_ticks = 0;
828 }
829
830 if new_output.contains(AGENT_EXIT_MARKER) {
832 break;
833 }
834
835 prev_output = new_output;
836
837 if unchanged_ticks >= 3 {
839 let status = fetch_session_status(client, base, name, token).await?;
840 let is_terminal = status == "ready" || status == "killed" || status == "lost";
841 if is_terminal {
842 break;
843 }
844 }
845 }
846 Ok(())
847}
848
849#[cfg(not(coverage))]
853async fn execute_schedule(
854 client: &reqwest::Client,
855 action: &ScheduleAction,
856 base: &str,
857 token: Option<&str>,
858) -> Result<String> {
859 match action {
860 ScheduleAction::Add {
861 name,
862 cron,
863 workdir,
864 node,
865 ink,
866 description,
867 command,
868 } => {
869 let cmd = if command.is_empty() {
870 None
871 } else {
872 Some(command.join(" "))
873 };
874 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
875 std::env::current_dir()
876 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
877 });
878 let mut body = serde_json::json!({
879 "name": name,
880 "cron": cron,
881 "workdir": resolved_workdir,
882 });
883 if let Some(c) = &cmd {
884 body["command"] = serde_json::json!(c);
885 }
886 if let Some(n) = node {
887 body["target_node"] = serde_json::json!(n);
888 }
889 if let Some(i) = ink {
890 body["ink"] = serde_json::json!(i);
891 }
892 if let Some(d) = description {
893 body["description"] = serde_json::json!(d);
894 }
895 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
896 .json(&body)
897 .send()
898 .await?;
899 ok_or_api_error(resp).await?;
900 Ok(format!("Created schedule \"{name}\""))
901 }
902 ScheduleAction::List => {
903 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
904 .send()
905 .await?;
906 let text = ok_or_api_error(resp).await?;
907 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
908 Ok(format_schedules(&schedules))
909 }
910 ScheduleAction::Remove { name } => {
911 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
912 .send()
913 .await?;
914 ok_or_api_error(resp).await?;
915 Ok(format!("Removed schedule \"{name}\""))
916 }
917 ScheduleAction::Pause { name } => {
918 let body = serde_json::json!({ "enabled": false });
919 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
920 .json(&body)
921 .send()
922 .await?;
923 ok_or_api_error(resp).await?;
924 Ok(format!("Paused schedule \"{name}\""))
925 }
926 ScheduleAction::Resume { name } => {
927 let body = serde_json::json!({ "enabled": true });
928 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
929 .json(&body)
930 .send()
931 .await?;
932 ok_or_api_error(resp).await?;
933 Ok(format!("Resumed schedule \"{name}\""))
934 }
935 }
936}
937
938#[cfg(coverage)]
940#[allow(clippy::unnecessary_wraps)]
941async fn execute_schedule(
942 _client: &reqwest::Client,
943 _action: &ScheduleAction,
944 _base: &str,
945 _token: Option<&str>,
946) -> Result<String> {
947 Ok(String::new())
948}
949
950#[cfg_attr(coverage, allow(dead_code))]
954fn format_secrets(secrets: &[serde_json::Value]) -> String {
955 if secrets.is_empty() {
956 return "No secrets configured.".into();
957 }
958 let mut lines = vec![format!("{:<24} {:<24} {}", "NAME", "ENV", "CREATED")];
959 for s in secrets {
960 let name = s["name"].as_str().unwrap_or("?");
961 let env_display = s["env"]
962 .as_str()
963 .map_or_else(|| name.to_owned(), String::from);
964 let created = s["created_at"]
965 .as_str()
966 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
967 lines.push(format!("{name:<24} {env_display:<24} {created}"));
968 }
969 lines.join("\n")
970}
971
972#[cfg(not(coverage))]
974async fn execute_secret(
975 client: &reqwest::Client,
976 action: &SecretAction,
977 base: &str,
978 token: Option<&str>,
979) -> Result<String> {
980 match action {
981 SecretAction::Set { name, value, env } => {
982 let mut body = serde_json::json!({ "value": value });
983 if let Some(e) = env {
984 body["env"] = serde_json::json!(e);
985 }
986 let resp = authed_put(client, format!("{base}/api/v1/secrets/{name}"), token)
987 .json(&body)
988 .send()
989 .await?;
990 ok_or_api_error(resp).await?;
991 Ok(format!("Secret \"{name}\" set."))
992 }
993 SecretAction::List => {
994 let resp = authed_get(client, format!("{base}/api/v1/secrets"), token)
995 .send()
996 .await?;
997 let text = ok_or_api_error(resp).await?;
998 let parsed: serde_json::Value = serde_json::from_str(&text)?;
999 let secrets = parsed["secrets"].as_array().map_or(&[][..], Vec::as_slice);
1000 Ok(format_secrets(secrets))
1001 }
1002 SecretAction::Delete { name } => {
1003 let resp = authed_delete(client, format!("{base}/api/v1/secrets/{name}"), token)
1004 .send()
1005 .await?;
1006 ok_or_api_error(resp).await?;
1007 Ok(format!("Secret \"{name}\" deleted."))
1008 }
1009 }
1010}
1011
1012#[cfg(coverage)]
1014#[allow(clippy::unnecessary_wraps)]
1015async fn execute_secret(
1016 _client: &reqwest::Client,
1017 _action: &SecretAction,
1018 _base: &str,
1019 _token: Option<&str>,
1020) -> Result<String> {
1021 Ok(String::new())
1022}
1023
1024#[cfg_attr(coverage, allow(dead_code))]
1026fn format_schedules(schedules: &[serde_json::Value]) -> String {
1027 if schedules.is_empty() {
1028 return "No schedules.".into();
1029 }
1030 let mut lines = vec![format!(
1031 "{:<20} {:<18} {:<8} {:<12} {}",
1032 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
1033 )];
1034 for s in schedules {
1035 let name = s["name"].as_str().unwrap_or("?");
1036 let cron = s["cron"].as_str().unwrap_or("?");
1037 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
1038 "yes"
1039 } else {
1040 "no"
1041 };
1042 let last_run = s["last_run_at"]
1043 .as_str()
1044 .map_or("-", |t| if t.len() >= 16 { &t[..16] } else { t });
1045 let node = s["target_node"].as_str().unwrap_or("local");
1046 lines.push(format!(
1047 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
1048 ));
1049 }
1050 lines.join("\n")
1051}
1052
1053#[cfg(not(coverage))]
1057async fn select_best_node(
1058 client: &reqwest::Client,
1059 base: &str,
1060 token: Option<&str>,
1061) -> Result<(String, String)> {
1062 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
1063 .send()
1064 .await?;
1065 let text = ok_or_api_error(resp).await?;
1066 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
1067
1068 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
1072 if peer.status != pulpo_common::peer::PeerStatus::Online {
1073 continue;
1074 }
1075 let sessions = peer.session_count.unwrap_or(0);
1076 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
1077 #[allow(clippy::cast_precision_loss)]
1079 let score = sessions as f64 - (mem as f64 / 1024.0);
1080 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
1081 best = Some((peer.address.clone(), peer.name.clone(), score));
1082 }
1083 }
1084
1085 match best {
1087 Some((addr, name, _)) => Ok((addr, name)),
1088 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
1089 }
1090}
1091
1092#[cfg(coverage)]
1094#[allow(clippy::unnecessary_wraps)]
1095async fn select_best_node(
1096 _client: &reqwest::Client,
1097 _base: &str,
1098 _token: Option<&str>,
1099) -> Result<(String, String)> {
1100 Ok(("localhost:7433".into(), "local".into()))
1101}
1102
1103#[allow(clippy::too_many_lines)]
1105pub async fn execute(cli: &Cli) -> Result<String> {
1106 let client = reqwest::Client::new();
1107 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
1108 let url = base_url(&resolved_node);
1109 let node = &resolved_node;
1110 let token = resolve_token(&client, &url, node, cli.token.as_deref())
1111 .await
1112 .or(peer_token);
1113
1114 if cli.command.is_none() && cli.path.is_none() {
1116 use clap::CommandFactory;
1118 let mut cmd = Cli::command();
1119 cmd.print_help()?;
1120 println!();
1121 return Ok(String::new());
1122 }
1123 if cli.command.is_none() {
1124 let path = cli.path.as_deref().unwrap_or(".");
1125 let resolved_workdir = resolve_path(path);
1126 let base_name = derive_session_name(&resolved_workdir);
1127 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
1128 let body = serde_json::json!({
1129 "name": name,
1130 "workdir": resolved_workdir,
1131 });
1132 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1133 .json(&body)
1134 .send()
1135 .await
1136 .map_err(|e| friendly_error(&e, node))?;
1137 let text = ok_or_api_error(resp).await?;
1138 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1139 let msg = format!(
1140 "Created session \"{}\" ({})",
1141 resp.session.name, resp.session.id
1142 );
1143 let backend_id = resp
1144 .session
1145 .backend_session_id
1146 .as_deref()
1147 .unwrap_or(&resp.session.name);
1148 eprintln!("{msg}");
1149 check_session_alive(&client, &url, &resp.session.name, token.as_deref()).await?;
1150 attach_session(backend_id)?;
1151 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1152 }
1153
1154 match cli.command.as_ref().unwrap() {
1155 Commands::Attach { name } => {
1156 let resp = authed_get(
1158 &client,
1159 format!("{url}/api/v1/sessions/{name}"),
1160 token.as_deref(),
1161 )
1162 .send()
1163 .await
1164 .map_err(|e| friendly_error(&e, node))?;
1165 let text = ok_or_api_error(resp).await?;
1166 let session: Session = serde_json::from_str(&text)?;
1167 match session.status.to_string().as_str() {
1168 "lost" => {
1169 anyhow::bail!(
1170 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
1171 );
1172 }
1173 "killed" => {
1174 anyhow::bail!(
1175 "Session \"{name}\" is {} — cannot attach to a killed session.",
1176 session.status
1177 );
1178 }
1179 _ => {}
1180 }
1181 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1182 attach_session(&backend_id)?;
1183 Ok(format!("Detached from session {name}."))
1184 }
1185 Commands::Input { name, text } => {
1186 let input_text = text.as_deref().unwrap_or("\n");
1187 let body = serde_json::json!({ "text": input_text });
1188 let resp = authed_post(
1189 &client,
1190 format!("{url}/api/v1/sessions/{name}/input"),
1191 token.as_deref(),
1192 )
1193 .json(&body)
1194 .send()
1195 .await
1196 .map_err(|e| friendly_error(&e, node))?;
1197 ok_or_api_error(resp).await?;
1198 Ok(format!("Sent input to session {name}."))
1199 }
1200 Commands::List { all } => {
1201 let list_url = if *all {
1202 format!("{url}/api/v1/sessions")
1203 } else {
1204 format!("{url}/api/v1/sessions?status=creating,active,idle,ready")
1205 };
1206 let resp = authed_get(&client, list_url, token.as_deref())
1207 .send()
1208 .await
1209 .map_err(|e| friendly_error(&e, node))?;
1210 let text = ok_or_api_error(resp).await?;
1211 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1212 Ok(format_sessions(&sessions))
1213 }
1214 Commands::Nodes => {
1215 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1216 .send()
1217 .await
1218 .map_err(|e| friendly_error(&e, node))?;
1219 let text = ok_or_api_error(resp).await?;
1220 let resp: PeersResponse = serde_json::from_str(&text)?;
1221 Ok(format_nodes(&resp))
1222 }
1223 Commands::Spawn {
1224 workdir,
1225 name,
1226 ink,
1227 description,
1228 detach,
1229 idle_threshold,
1230 auto,
1231 worktree,
1232 runtime,
1233 secret,
1234 command,
1235 } => {
1236 let cmd = if command.is_empty() {
1237 None
1238 } else {
1239 Some(command.join(" "))
1240 };
1241 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1243 std::env::current_dir()
1244 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1245 });
1246 let resolved_name = if let Some(n) = name {
1248 n.clone()
1249 } else {
1250 let base_name = derive_session_name(&resolved_workdir);
1251 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1252 };
1253 let mut body = serde_json::json!({
1254 "name": resolved_name,
1255 "workdir": resolved_workdir,
1256 });
1257 if let Some(c) = &cmd {
1258 body["command"] = serde_json::json!(c);
1259 }
1260 if let Some(i) = ink {
1261 body["ink"] = serde_json::json!(i);
1262 }
1263 if let Some(d) = description {
1264 body["description"] = serde_json::json!(d);
1265 }
1266 if let Some(t) = idle_threshold {
1267 body["idle_threshold_secs"] = serde_json::json!(t);
1268 }
1269 if *worktree {
1270 body["worktree"] = serde_json::json!(true);
1271 eprintln!(
1272 "Worktree: branch pulpo/{resolved_name} in {resolved_workdir}/.pulpo/worktrees/{resolved_name}/"
1273 );
1274 }
1275 if let Some(rt) = runtime {
1276 body["runtime"] = serde_json::json!(rt);
1277 }
1278 if !secret.is_empty() {
1279 body["secrets"] = serde_json::json!(secret);
1280 }
1281 let spawn_url = if *auto {
1282 let (auto_addr, auto_name) =
1283 select_best_node(&client, &url, token.as_deref()).await?;
1284 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1285 base_url(&auto_addr)
1286 } else {
1287 url.clone()
1288 };
1289 let resp = authed_post(
1290 &client,
1291 format!("{spawn_url}/api/v1/sessions"),
1292 token.as_deref(),
1293 )
1294 .json(&body)
1295 .send()
1296 .await
1297 .map_err(|e| friendly_error(&e, node))?;
1298 let text = ok_or_api_error(resp).await?;
1299 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1300 let msg = format!(
1301 "Created session \"{}\" ({})",
1302 resp.session.name, resp.session.id
1303 );
1304 if !detach {
1305 let backend_id = resp
1306 .session
1307 .backend_session_id
1308 .as_deref()
1309 .unwrap_or(&resp.session.name);
1310 eprintln!("{msg}");
1311 check_session_alive(&client, &url, &resp.session.name, token.as_deref()).await?;
1312 attach_session(backend_id)?;
1313 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1314 }
1315 Ok(msg)
1316 }
1317 Commands::Kill { name } => {
1318 let resp = authed_post(
1319 &client,
1320 format!("{url}/api/v1/sessions/{name}/kill"),
1321 token.as_deref(),
1322 )
1323 .send()
1324 .await
1325 .map_err(|e| friendly_error(&e, node))?;
1326 ok_or_api_error(resp).await?;
1327 Ok(format!("Session {name} killed."))
1328 }
1329 Commands::Delete { name } => {
1330 let resp = authed_delete(
1331 &client,
1332 format!("{url}/api/v1/sessions/{name}"),
1333 token.as_deref(),
1334 )
1335 .send()
1336 .await
1337 .map_err(|e| friendly_error(&e, node))?;
1338 ok_or_api_error(resp).await?;
1339 Ok(format!("Session {name} deleted."))
1340 }
1341 Commands::Logs {
1342 name,
1343 lines,
1344 follow,
1345 } => {
1346 if *follow {
1347 let mut stdout = std::io::stdout();
1348 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1349 .await
1350 .map_err(|e| {
1351 match e.downcast::<reqwest::Error>() {
1353 Ok(re) => friendly_error(&re, node),
1354 Err(other) => other,
1355 }
1356 })?;
1357 Ok(String::new())
1358 } else {
1359 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1360 .await
1361 .map_err(|e| match e.downcast::<reqwest::Error>() {
1362 Ok(re) => friendly_error(&re, node),
1363 Err(other) => other,
1364 })?;
1365 Ok(output)
1366 }
1367 }
1368 Commands::Interventions { name } => {
1369 let resp = authed_get(
1370 &client,
1371 format!("{url}/api/v1/sessions/{name}/interventions"),
1372 token.as_deref(),
1373 )
1374 .send()
1375 .await
1376 .map_err(|e| friendly_error(&e, node))?;
1377 let text = ok_or_api_error(resp).await?;
1378 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1379 Ok(format_interventions(&events))
1380 }
1381 Commands::Ui => {
1382 let dashboard = base_url(node);
1383 open_browser(&dashboard)?;
1384 Ok(format!("Opening {dashboard}"))
1385 }
1386 Commands::Resume { name } => {
1387 let resp = authed_post(
1388 &client,
1389 format!("{url}/api/v1/sessions/{name}/resume"),
1390 token.as_deref(),
1391 )
1392 .send()
1393 .await
1394 .map_err(|e| friendly_error(&e, node))?;
1395 let text = ok_or_api_error(resp).await?;
1396 let session: Session = serde_json::from_str(&text)?;
1397 let backend_id = session
1398 .backend_session_id
1399 .as_deref()
1400 .unwrap_or(&session.name);
1401 eprintln!("Resumed session \"{}\"", session.name);
1402 check_session_alive(&client, &url, name, token.as_deref()).await?;
1403 attach_session(backend_id)?;
1404 Ok(format!("Detached from session \"{}\".", session.name))
1405 }
1406 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1407 .await
1408 .map_err(|e| match e.downcast::<reqwest::Error>() {
1409 Ok(re) => friendly_error(&re, node),
1410 Err(other) => other,
1411 }),
1412 Commands::Secret { action } => execute_secret(&client, action, &url, token.as_deref())
1413 .await
1414 .map_err(|e| match e.downcast::<reqwest::Error>() {
1415 Ok(re) => friendly_error(&re, node),
1416 Err(other) => other,
1417 }),
1418 }
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423 use super::*;
1424
1425 #[test]
1426 fn test_base_url() {
1427 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1428 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1429 }
1430
1431 #[test]
1432 fn test_cli_parse_list() {
1433 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1434 assert_eq!(cli.node, "localhost:7433");
1435 assert!(matches!(cli.command, Some(Commands::List { .. })));
1436 }
1437
1438 #[test]
1439 fn test_cli_parse_nodes() {
1440 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1441 assert!(matches!(cli.command, Some(Commands::Nodes)));
1442 }
1443
1444 #[test]
1445 fn test_cli_parse_ui() {
1446 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1447 assert!(matches!(cli.command, Some(Commands::Ui)));
1448 }
1449
1450 #[test]
1451 fn test_cli_parse_ui_custom_node() {
1452 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1453 assert_eq!(cli.node, "mac-mini:7433");
1455 assert_eq!(cli.path.as_deref(), Some("ui"));
1456 }
1457
1458 #[test]
1459 fn test_build_open_command() {
1460 let cmd = build_open_command("http://localhost:7433");
1461 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1462 assert_eq!(args, vec!["http://localhost:7433"]);
1463 #[cfg(target_os = "macos")]
1464 assert_eq!(cmd.get_program(), "open");
1465 #[cfg(target_os = "linux")]
1466 assert_eq!(cmd.get_program(), "xdg-open");
1467 }
1468
1469 #[test]
1470 fn test_cli_parse_spawn() {
1471 let cli = Cli::try_parse_from([
1472 "pulpo",
1473 "spawn",
1474 "my-task",
1475 "--workdir",
1476 "/tmp/repo",
1477 "--",
1478 "claude",
1479 "-p",
1480 "Fix the bug",
1481 ])
1482 .unwrap();
1483 assert!(matches!(
1484 &cli.command,
1485 Some(Commands::Spawn { name, workdir, command, .. })
1486 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1487 && command == &["claude", "-p", "Fix the bug"]
1488 ));
1489 }
1490
1491 #[test]
1492 fn test_cli_parse_spawn_with_ink() {
1493 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1494 assert!(matches!(
1495 &cli.command,
1496 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1497 ));
1498 }
1499
1500 #[test]
1501 fn test_cli_parse_spawn_with_description() {
1502 let cli =
1503 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1504 .unwrap();
1505 assert!(matches!(
1506 &cli.command,
1507 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1508 ));
1509 }
1510
1511 #[test]
1512 fn test_cli_parse_spawn_name_positional() {
1513 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1514 assert!(matches!(
1515 &cli.command,
1516 Some(Commands::Spawn { name, command, .. })
1517 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1518 ));
1519 }
1520
1521 #[test]
1522 fn test_cli_parse_spawn_no_command() {
1523 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1524 assert!(matches!(
1525 &cli.command,
1526 Some(Commands::Spawn { command, .. }) if command.is_empty()
1527 ));
1528 }
1529
1530 #[test]
1531 fn test_cli_parse_spawn_idle_threshold() {
1532 let cli =
1533 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1534 assert!(matches!(
1535 &cli.command,
1536 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1537 ));
1538 }
1539
1540 #[test]
1541 fn test_cli_parse_spawn_idle_threshold_60() {
1542 let cli =
1543 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1544 assert!(matches!(
1545 &cli.command,
1546 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1547 ));
1548 }
1549
1550 #[test]
1551 fn test_cli_parse_spawn_secrets() {
1552 let cli = Cli::try_parse_from([
1553 "pulpo",
1554 "spawn",
1555 "my-task",
1556 "--secret",
1557 "GITHUB_TOKEN",
1558 "--secret",
1559 "NPM_TOKEN",
1560 ])
1561 .unwrap();
1562 assert!(matches!(
1563 &cli.command,
1564 Some(Commands::Spawn { secret, .. }) if secret == &["GITHUB_TOKEN", "NPM_TOKEN"]
1565 ));
1566 }
1567
1568 #[test]
1569 fn test_cli_parse_spawn_no_secrets() {
1570 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1571 assert!(matches!(
1572 &cli.command,
1573 Some(Commands::Spawn { secret, .. }) if secret.is_empty()
1574 ));
1575 }
1576
1577 #[test]
1578 fn test_cli_parse_secret_set_with_env() {
1579 let cli = Cli::try_parse_from([
1580 "pulpo",
1581 "secret",
1582 "set",
1583 "GH_WORK",
1584 "token123",
1585 "--env",
1586 "GITHUB_TOKEN",
1587 ])
1588 .unwrap();
1589 assert!(matches!(
1590 &cli.command,
1591 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1592 if name == "GH_WORK" && value == "token123" && env.as_deref() == Some("GITHUB_TOKEN")
1593 ));
1594 }
1595
1596 #[test]
1597 fn test_cli_parse_secret_set_without_env() {
1598 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_KEY", "val"]).unwrap();
1599 assert!(matches!(
1600 &cli.command,
1601 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
1602 if name == "MY_KEY" && value == "val" && env.is_none()
1603 ));
1604 }
1605
1606 #[test]
1607 fn test_cli_parse_spawn_worktree() {
1608 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1609 assert!(matches!(
1610 &cli.command,
1611 Some(Commands::Spawn { worktree, .. }) if *worktree
1612 ));
1613 }
1614
1615 #[test]
1616 fn test_cli_parse_spawn_detach() {
1617 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1618 assert!(matches!(
1619 &cli.command,
1620 Some(Commands::Spawn { detach, .. }) if *detach
1621 ));
1622 }
1623
1624 #[test]
1625 fn test_cli_parse_spawn_detach_short() {
1626 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1627 assert!(matches!(
1628 &cli.command,
1629 Some(Commands::Spawn { detach, .. }) if *detach
1630 ));
1631 }
1632
1633 #[test]
1634 fn test_cli_parse_spawn_detach_default() {
1635 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1636 assert!(matches!(
1637 &cli.command,
1638 Some(Commands::Spawn { detach, .. }) if !detach
1639 ));
1640 }
1641
1642 #[test]
1643 fn test_cli_parse_logs() {
1644 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1645 assert!(matches!(
1646 &cli.command,
1647 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1648 ));
1649 }
1650
1651 #[test]
1652 fn test_cli_parse_logs_with_lines() {
1653 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1654 assert!(matches!(
1655 &cli.command,
1656 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1657 ));
1658 }
1659
1660 #[test]
1661 fn test_cli_parse_logs_follow() {
1662 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1663 assert!(matches!(
1664 &cli.command,
1665 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1666 ));
1667 }
1668
1669 #[test]
1670 fn test_cli_parse_logs_follow_short() {
1671 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1672 assert!(matches!(
1673 &cli.command,
1674 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1675 ));
1676 }
1677
1678 #[test]
1679 fn test_cli_parse_kill() {
1680 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1681 assert!(matches!(
1682 &cli.command,
1683 Some(Commands::Kill { name }) if name == "my-session"
1684 ));
1685 }
1686
1687 #[test]
1688 fn test_cli_parse_delete() {
1689 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1690 assert!(matches!(
1691 &cli.command,
1692 Some(Commands::Delete { name }) if name == "my-session"
1693 ));
1694 }
1695
1696 #[test]
1697 fn test_cli_parse_resume() {
1698 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1699 assert!(matches!(
1700 &cli.command,
1701 Some(Commands::Resume { name }) if name == "my-session"
1702 ));
1703 }
1704
1705 #[test]
1706 fn test_cli_parse_input() {
1707 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1708 assert!(matches!(
1709 &cli.command,
1710 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1711 ));
1712 }
1713
1714 #[test]
1715 fn test_cli_parse_input_no_text() {
1716 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1717 assert!(matches!(
1718 &cli.command,
1719 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1720 ));
1721 }
1722
1723 #[test]
1724 fn test_cli_parse_input_alias() {
1725 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1726 assert!(matches!(
1727 &cli.command,
1728 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1729 ));
1730 }
1731
1732 #[test]
1733 fn test_cli_parse_custom_node() {
1734 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1735 assert_eq!(cli.node, "win-pc:8080");
1736 assert_eq!(cli.path.as_deref(), Some("list"));
1738 }
1739
1740 #[test]
1741 fn test_cli_version() {
1742 let result = Cli::try_parse_from(["pulpo", "--version"]);
1743 let err = result.unwrap_err();
1745 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1746 }
1747
1748 #[test]
1749 fn test_cli_parse_no_subcommand_succeeds() {
1750 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1751 assert!(cli.command.is_none());
1752 assert!(cli.path.is_none());
1753 }
1754
1755 #[test]
1756 fn test_cli_debug() {
1757 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1758 let debug = format!("{cli:?}");
1759 assert!(debug.contains("List"));
1760 }
1761
1762 #[test]
1763 fn test_commands_debug() {
1764 let cmd = Commands::List { all: false };
1765 assert_eq!(format!("{cmd:?}"), "List { all: false }");
1766 }
1767
1768 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1770
1771 fn test_create_response_json() -> String {
1773 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1774 }
1775
1776 async fn start_test_server() -> String {
1778 use axum::http::StatusCode;
1779 use axum::{
1780 Json, Router,
1781 routing::{get, post},
1782 };
1783
1784 let create_json = test_create_response_json();
1785
1786 let app = Router::new()
1787 .route(
1788 "/api/v1/sessions",
1789 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1790 (StatusCode::CREATED, create_json.clone())
1791 }),
1792 )
1793 .route(
1794 "/api/v1/sessions/{id}",
1795 get(|| async { TEST_SESSION_JSON.to_owned() })
1796 .delete(|| async { StatusCode::NO_CONTENT }),
1797 )
1798 .route(
1799 "/api/v1/sessions/{id}/kill",
1800 post(|| async { StatusCode::NO_CONTENT }),
1801 )
1802 .route(
1803 "/api/v1/sessions/{id}/output",
1804 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1805 )
1806 .route(
1807 "/api/v1/peers",
1808 get(|| async {
1809 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1810 }),
1811 )
1812 .route(
1813 "/api/v1/sessions/{id}/resume",
1814 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1815 )
1816 .route(
1817 "/api/v1/sessions/{id}/interventions",
1818 get(|| async { "[]".to_owned() }),
1819 )
1820 .route(
1821 "/api/v1/sessions/{id}/input",
1822 post(|| async { StatusCode::NO_CONTENT }),
1823 )
1824 .route(
1825 "/api/v1/schedules",
1826 get(|| async { Json::<Vec<()>>(vec![]) })
1827 .post(|| async { StatusCode::CREATED }),
1828 )
1829 .route(
1830 "/api/v1/schedules/{id}",
1831 axum::routing::put(|| async { StatusCode::OK })
1832 .delete(|| async { StatusCode::NO_CONTENT }),
1833 );
1834
1835 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1836 let addr = listener.local_addr().unwrap();
1837 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1838 format!("127.0.0.1:{}", addr.port())
1839 }
1840
1841 #[tokio::test]
1842 async fn test_execute_list_success() {
1843 let node = start_test_server().await;
1844 let cli = Cli {
1845 node,
1846 token: None,
1847 command: Some(Commands::List { all: false }),
1848 path: None,
1849 };
1850 let result = execute(&cli).await.unwrap();
1851 assert_eq!(result, "No sessions.");
1852 }
1853
1854 #[tokio::test]
1855 async fn test_execute_nodes_success() {
1856 let node = start_test_server().await;
1857 let cli = Cli {
1858 node,
1859 token: None,
1860 command: Some(Commands::Nodes),
1861 path: None,
1862 };
1863 let result = execute(&cli).await.unwrap();
1864 assert!(result.contains("test"));
1865 assert!(result.contains("(local)"));
1866 assert!(result.contains("NAME"));
1867 }
1868
1869 #[tokio::test]
1870 async fn test_execute_spawn_success() {
1871 let node = start_test_server().await;
1872 let cli = Cli {
1873 node,
1874 token: None,
1875 command: Some(Commands::Spawn {
1876 name: Some("test".into()),
1877 workdir: Some("/tmp/repo".into()),
1878 ink: None,
1879 description: None,
1880 detach: true,
1881 idle_threshold: None,
1882 auto: false,
1883 worktree: false,
1884 runtime: None,
1885 secret: vec![],
1886 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1887 }),
1888 path: None,
1889 };
1890 let result = execute(&cli).await.unwrap();
1891 assert!(result.contains("Created session"));
1892 assert!(result.contains("repo"));
1893 }
1894
1895 #[tokio::test]
1896 async fn test_execute_spawn_with_all_flags() {
1897 let node = start_test_server().await;
1898 let cli = Cli {
1899 node,
1900 token: None,
1901 command: Some(Commands::Spawn {
1902 name: Some("test".into()),
1903 workdir: Some("/tmp/repo".into()),
1904 ink: Some("coder".into()),
1905 description: Some("Fix the bug".into()),
1906 detach: true,
1907 idle_threshold: None,
1908 auto: false,
1909 worktree: false,
1910 runtime: None,
1911 secret: vec![],
1912 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1913 }),
1914 path: None,
1915 };
1916 let result = execute(&cli).await.unwrap();
1917 assert!(result.contains("Created session"));
1918 }
1919
1920 #[tokio::test]
1921 async fn test_execute_spawn_with_idle_threshold_and_worktree_and_docker_runtime() {
1922 let node = start_test_server().await;
1923 let cli = Cli {
1924 node,
1925 token: None,
1926 command: Some(Commands::Spawn {
1927 name: Some("full-opts".into()),
1928 workdir: Some("/tmp/repo".into()),
1929 ink: Some("coder".into()),
1930 description: Some("Full options".into()),
1931 detach: true,
1932 idle_threshold: Some(120),
1933 auto: false,
1934 worktree: true,
1935 runtime: Some("docker".into()),
1936 secret: vec![],
1937 command: vec!["claude".into()],
1938 }),
1939 path: None,
1940 };
1941 let result = execute(&cli).await.unwrap();
1942 assert!(result.contains("Created session"));
1943 }
1944
1945 #[tokio::test]
1946 async fn test_execute_spawn_no_name_derives_from_workdir() {
1947 let node = start_test_server().await;
1948 let cli = Cli {
1949 node,
1950 token: None,
1951 command: Some(Commands::Spawn {
1952 name: None,
1953 workdir: Some("/tmp/my-project".into()),
1954 ink: None,
1955 description: None,
1956 detach: true,
1957 idle_threshold: None,
1958 auto: false,
1959 worktree: false,
1960 runtime: None,
1961 secret: vec![],
1962 command: vec!["echo".into(), "hello".into()],
1963 }),
1964 path: None,
1965 };
1966 let result = execute(&cli).await.unwrap();
1967 assert!(result.contains("Created session"));
1968 }
1969
1970 #[tokio::test]
1971 async fn test_execute_spawn_no_command() {
1972 let node = start_test_server().await;
1973 let cli = Cli {
1974 node,
1975 token: None,
1976 command: Some(Commands::Spawn {
1977 name: Some("test".into()),
1978 workdir: Some("/tmp/repo".into()),
1979 ink: None,
1980 description: None,
1981 detach: true,
1982 idle_threshold: None,
1983 auto: false,
1984 worktree: false,
1985 runtime: None,
1986 secret: vec![],
1987 command: vec![],
1988 }),
1989 path: None,
1990 };
1991 let result = execute(&cli).await.unwrap();
1992 assert!(result.contains("Created session"));
1993 }
1994
1995 #[tokio::test]
1996 async fn test_execute_spawn_with_name() {
1997 let node = start_test_server().await;
1998 let cli = Cli {
1999 node,
2000 token: None,
2001 command: Some(Commands::Spawn {
2002 name: Some("my-task".into()),
2003 workdir: Some("/tmp/repo".into()),
2004 ink: None,
2005 description: None,
2006 detach: true,
2007 idle_threshold: None,
2008 auto: false,
2009 worktree: false,
2010 runtime: None,
2011 secret: vec![],
2012 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2013 }),
2014 path: None,
2015 };
2016 let result = execute(&cli).await.unwrap();
2017 assert!(result.contains("Created session"));
2018 }
2019
2020 #[tokio::test]
2021 async fn test_execute_spawn_auto_attach() {
2022 let node = start_test_server().await;
2023 let cli = Cli {
2024 node,
2025 token: None,
2026 command: Some(Commands::Spawn {
2027 name: Some("test".into()),
2028 workdir: Some("/tmp/repo".into()),
2029 ink: None,
2030 description: None,
2031 detach: false,
2032 idle_threshold: None,
2033 auto: false,
2034 worktree: false,
2035 runtime: None,
2036 secret: vec![],
2037 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
2038 }),
2039 path: None,
2040 };
2041 let result = execute(&cli).await.unwrap();
2042 assert!(result.contains("Detached from session"));
2044 }
2045
2046 #[tokio::test]
2047 async fn test_execute_kill_success() {
2048 let node = start_test_server().await;
2049 let cli = Cli {
2050 node,
2051 token: None,
2052 command: Some(Commands::Kill {
2053 name: "test-session".into(),
2054 }),
2055 path: None,
2056 };
2057 let result = execute(&cli).await.unwrap();
2058 assert!(result.contains("killed"));
2059 }
2060
2061 #[tokio::test]
2062 async fn test_execute_delete_success() {
2063 let node = start_test_server().await;
2064 let cli = Cli {
2065 node,
2066 token: None,
2067 command: Some(Commands::Delete {
2068 name: "test-session".into(),
2069 }),
2070 path: None,
2071 };
2072 let result = execute(&cli).await.unwrap();
2073 assert!(result.contains("deleted"));
2074 }
2075
2076 #[tokio::test]
2077 async fn test_execute_logs_success() {
2078 let node = start_test_server().await;
2079 let cli = Cli {
2080 node,
2081 token: None,
2082 command: Some(Commands::Logs {
2083 name: "test-session".into(),
2084 lines: 50,
2085 follow: false,
2086 }),
2087 path: None,
2088 };
2089 let result = execute(&cli).await.unwrap();
2090 assert!(result.contains("test output"));
2091 }
2092
2093 #[tokio::test]
2094 async fn test_execute_list_connection_refused() {
2095 let cli = Cli {
2096 node: "localhost:1".into(),
2097 token: None,
2098 command: Some(Commands::List { all: false }),
2099 path: None,
2100 };
2101 let result = execute(&cli).await;
2102 let err = result.unwrap_err().to_string();
2103 assert!(
2104 err.contains("Could not connect to pulpod"),
2105 "Expected friendly error, got: {err}"
2106 );
2107 assert!(err.contains("localhost:1"));
2108 }
2109
2110 #[tokio::test]
2111 async fn test_execute_nodes_connection_refused() {
2112 let cli = Cli {
2113 node: "localhost:1".into(),
2114 token: None,
2115 command: Some(Commands::Nodes),
2116 path: None,
2117 };
2118 let result = execute(&cli).await;
2119 let err = result.unwrap_err().to_string();
2120 assert!(err.contains("Could not connect to pulpod"));
2121 }
2122
2123 #[tokio::test]
2124 async fn test_execute_kill_error_response() {
2125 use axum::{Router, http::StatusCode, routing::post};
2126
2127 let app = Router::new().route(
2128 "/api/v1/sessions/{id}/kill",
2129 post(|| async {
2130 (
2131 StatusCode::NOT_FOUND,
2132 "{\"error\":\"session not found: test-session\"}",
2133 )
2134 }),
2135 );
2136 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2137 let addr = listener.local_addr().unwrap();
2138 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2139 let node = format!("127.0.0.1:{}", addr.port());
2140
2141 let cli = Cli {
2142 node,
2143 token: None,
2144 command: Some(Commands::Kill {
2145 name: "test-session".into(),
2146 }),
2147 path: None,
2148 };
2149 let err = execute(&cli).await.unwrap_err();
2150 assert_eq!(err.to_string(), "session not found: test-session");
2151 }
2152
2153 #[tokio::test]
2154 async fn test_execute_delete_error_response() {
2155 use axum::{Router, http::StatusCode, routing::delete};
2156
2157 let app = Router::new().route(
2158 "/api/v1/sessions/{id}",
2159 delete(|| async {
2160 (
2161 StatusCode::CONFLICT,
2162 "{\"error\":\"cannot delete session in 'running' state\"}",
2163 )
2164 }),
2165 );
2166 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2167 let addr = listener.local_addr().unwrap();
2168 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2169 let node = format!("127.0.0.1:{}", addr.port());
2170
2171 let cli = Cli {
2172 node,
2173 token: None,
2174 command: Some(Commands::Delete {
2175 name: "test-session".into(),
2176 }),
2177 path: None,
2178 };
2179 let err = execute(&cli).await.unwrap_err();
2180 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
2181 }
2182
2183 #[tokio::test]
2184 async fn test_execute_logs_error_response() {
2185 use axum::{Router, http::StatusCode, routing::get};
2186
2187 let app = Router::new().route(
2188 "/api/v1/sessions/{id}/output",
2189 get(|| async {
2190 (
2191 StatusCode::NOT_FOUND,
2192 "{\"error\":\"session not found: ghost\"}",
2193 )
2194 }),
2195 );
2196 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2197 let addr = listener.local_addr().unwrap();
2198 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2199 let node = format!("127.0.0.1:{}", addr.port());
2200
2201 let cli = Cli {
2202 node,
2203 token: None,
2204 command: Some(Commands::Logs {
2205 name: "ghost".into(),
2206 lines: 50,
2207 follow: false,
2208 }),
2209 path: None,
2210 };
2211 let err = execute(&cli).await.unwrap_err();
2212 assert_eq!(err.to_string(), "session not found: ghost");
2213 }
2214
2215 #[tokio::test]
2216 async fn test_execute_resume_error_response() {
2217 use axum::{Router, http::StatusCode, routing::post};
2218
2219 let app = Router::new().route(
2220 "/api/v1/sessions/{id}/resume",
2221 post(|| async {
2222 (
2223 StatusCode::BAD_REQUEST,
2224 "{\"error\":\"session is not lost (status: active)\"}",
2225 )
2226 }),
2227 );
2228 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2229 let addr = listener.local_addr().unwrap();
2230 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2231 let node = format!("127.0.0.1:{}", addr.port());
2232
2233 let cli = Cli {
2234 node,
2235 token: None,
2236 command: Some(Commands::Resume {
2237 name: "test-session".into(),
2238 }),
2239 path: None,
2240 };
2241 let err = execute(&cli).await.unwrap_err();
2242 assert_eq!(err.to_string(), "session is not lost (status: active)");
2243 }
2244
2245 #[tokio::test]
2246 async fn test_execute_spawn_error_response() {
2247 use axum::{Router, http::StatusCode, routing::post};
2248
2249 let app = Router::new().route(
2250 "/api/v1/sessions",
2251 post(|| async {
2252 (
2253 StatusCode::INTERNAL_SERVER_ERROR,
2254 "{\"error\":\"failed to spawn session\"}",
2255 )
2256 }),
2257 );
2258 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2259 let addr = listener.local_addr().unwrap();
2260 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2261 let node = format!("127.0.0.1:{}", addr.port());
2262
2263 let cli = Cli {
2264 node,
2265 token: None,
2266 command: Some(Commands::Spawn {
2267 name: Some("test".into()),
2268 workdir: Some("/tmp/repo".into()),
2269 ink: None,
2270 description: None,
2271 detach: true,
2272 idle_threshold: None,
2273 auto: false,
2274 worktree: false,
2275 runtime: None,
2276 secret: vec![],
2277 command: vec!["test".into()],
2278 }),
2279 path: None,
2280 };
2281 let err = execute(&cli).await.unwrap_err();
2282 assert_eq!(err.to_string(), "failed to spawn session");
2283 }
2284
2285 #[tokio::test]
2286 async fn test_execute_interventions_error_response() {
2287 use axum::{Router, http::StatusCode, routing::get};
2288
2289 let app = Router::new().route(
2290 "/api/v1/sessions/{id}/interventions",
2291 get(|| async {
2292 (
2293 StatusCode::NOT_FOUND,
2294 "{\"error\":\"session not found: ghost\"}",
2295 )
2296 }),
2297 );
2298 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2299 let addr = listener.local_addr().unwrap();
2300 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2301 let node = format!("127.0.0.1:{}", addr.port());
2302
2303 let cli = Cli {
2304 node,
2305 token: None,
2306 command: Some(Commands::Interventions {
2307 name: "ghost".into(),
2308 }),
2309 path: None,
2310 };
2311 let err = execute(&cli).await.unwrap_err();
2312 assert_eq!(err.to_string(), "session not found: ghost");
2313 }
2314
2315 #[tokio::test]
2316 async fn test_execute_resume_success() {
2317 let node = start_test_server().await;
2318 let cli = Cli {
2319 node,
2320 token: None,
2321 command: Some(Commands::Resume {
2322 name: "test-session".into(),
2323 }),
2324 path: None,
2325 };
2326 let result = execute(&cli).await.unwrap();
2327 assert!(result.contains("Detached from session"));
2328 }
2329
2330 #[tokio::test]
2331 async fn test_execute_input_success() {
2332 let node = start_test_server().await;
2333 let cli = Cli {
2334 node,
2335 token: None,
2336 command: Some(Commands::Input {
2337 name: "test-session".into(),
2338 text: Some("yes".into()),
2339 }),
2340 path: None,
2341 };
2342 let result = execute(&cli).await.unwrap();
2343 assert!(result.contains("Sent input to session test-session"));
2344 }
2345
2346 #[tokio::test]
2347 async fn test_execute_input_no_text() {
2348 let node = start_test_server().await;
2349 let cli = Cli {
2350 node,
2351 token: None,
2352 command: Some(Commands::Input {
2353 name: "test-session".into(),
2354 text: None,
2355 }),
2356 path: None,
2357 };
2358 let result = execute(&cli).await.unwrap();
2359 assert!(result.contains("Sent input to session test-session"));
2360 }
2361
2362 #[tokio::test]
2363 async fn test_execute_input_connection_refused() {
2364 let cli = Cli {
2365 node: "localhost:1".into(),
2366 token: None,
2367 command: Some(Commands::Input {
2368 name: "test".into(),
2369 text: Some("y".into()),
2370 }),
2371 path: None,
2372 };
2373 let result = execute(&cli).await;
2374 let err = result.unwrap_err().to_string();
2375 assert!(err.contains("Could not connect to pulpod"));
2376 }
2377
2378 #[tokio::test]
2379 async fn test_execute_input_error_response() {
2380 use axum::{Router, http::StatusCode, routing::post};
2381
2382 let app = Router::new().route(
2383 "/api/v1/sessions/{id}/input",
2384 post(|| async {
2385 (
2386 StatusCode::NOT_FOUND,
2387 "{\"error\":\"session not found: ghost\"}",
2388 )
2389 }),
2390 );
2391 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2392 let addr = listener.local_addr().unwrap();
2393 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2394 let node = format!("127.0.0.1:{}", addr.port());
2395
2396 let cli = Cli {
2397 node,
2398 token: None,
2399 command: Some(Commands::Input {
2400 name: "ghost".into(),
2401 text: Some("y".into()),
2402 }),
2403 path: None,
2404 };
2405 let err = execute(&cli).await.unwrap_err();
2406 assert_eq!(err.to_string(), "session not found: ghost");
2407 }
2408
2409 #[tokio::test]
2410 async fn test_execute_ui() {
2411 let cli = Cli {
2412 node: "localhost:7433".into(),
2413 token: None,
2414 command: Some(Commands::Ui),
2415 path: None,
2416 };
2417 let result = execute(&cli).await.unwrap();
2418 assert!(result.contains("Opening"));
2419 assert!(result.contains("http://localhost:7433"));
2420 }
2421
2422 #[tokio::test]
2423 async fn test_execute_ui_custom_node() {
2424 let cli = Cli {
2425 node: "mac-mini:7433".into(),
2426 token: None,
2427 command: Some(Commands::Ui),
2428 path: None,
2429 };
2430 let result = execute(&cli).await.unwrap();
2431 assert!(result.contains("http://mac-mini:7433"));
2432 }
2433
2434 #[test]
2435 fn test_format_sessions_empty() {
2436 assert_eq!(format_sessions(&[]), "No sessions.");
2437 }
2438
2439 #[test]
2440 fn test_format_sessions_with_data() {
2441 use chrono::Utc;
2442 use pulpo_common::session::SessionStatus;
2443 use uuid::Uuid;
2444
2445 let sessions = vec![Session {
2446 id: Uuid::nil(),
2447 name: "my-api".into(),
2448 workdir: "/tmp/repo".into(),
2449 command: "claude -p 'Fix the bug'".into(),
2450 description: Some("Fix the bug".into()),
2451 status: SessionStatus::Active,
2452 exit_code: None,
2453 backend_session_id: None,
2454 output_snapshot: None,
2455 metadata: None,
2456 ink: None,
2457 intervention_code: None,
2458 intervention_reason: None,
2459 intervention_at: None,
2460 last_output_at: None,
2461 idle_since: None,
2462 idle_threshold_secs: None,
2463 worktree_path: None,
2464 runtime: Runtime::Tmux,
2465 created_at: Utc::now(),
2466 updated_at: Utc::now(),
2467 }];
2468 let output = format_sessions(&sessions);
2469 assert!(output.contains("ID"));
2470 assert!(output.contains("NAME"));
2471 assert!(output.contains("RUNTIME"));
2472 assert!(output.contains("COMMAND"));
2473 assert!(output.contains("00000000"));
2474 assert!(output.contains("my-api"));
2475 assert!(output.contains("active"));
2476 assert!(output.contains("tmux"));
2477 assert!(output.contains("claude -p 'Fix the bug'"));
2478 }
2479
2480 #[test]
2481 fn test_format_sessions_docker_runtime() {
2482 use chrono::Utc;
2483 use pulpo_common::session::SessionStatus;
2484 use uuid::Uuid;
2485
2486 let sessions = vec![Session {
2487 id: Uuid::nil(),
2488 name: "sandbox-test".into(),
2489 workdir: "/tmp".into(),
2490 command: "claude".into(),
2491 description: None,
2492 status: SessionStatus::Active,
2493 exit_code: None,
2494 backend_session_id: Some("docker:pulpo-sandbox-test".into()),
2495 output_snapshot: None,
2496 metadata: None,
2497 ink: None,
2498 intervention_code: None,
2499 intervention_reason: None,
2500 intervention_at: None,
2501 last_output_at: None,
2502 idle_since: None,
2503 idle_threshold_secs: None,
2504 worktree_path: None,
2505 runtime: Runtime::Docker,
2506 created_at: Utc::now(),
2507 updated_at: Utc::now(),
2508 }];
2509 let output = format_sessions(&sessions);
2510 assert!(output.contains("docker"));
2511 }
2512
2513 #[test]
2514 fn test_format_sessions_long_command_truncated() {
2515 use chrono::Utc;
2516 use pulpo_common::session::SessionStatus;
2517 use uuid::Uuid;
2518
2519 let sessions = vec![Session {
2520 id: Uuid::nil(),
2521 name: "test".into(),
2522 workdir: "/tmp".into(),
2523 command:
2524 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2525 .into(),
2526 description: None,
2527 status: SessionStatus::Ready,
2528 exit_code: None,
2529 backend_session_id: None,
2530 output_snapshot: None,
2531 metadata: None,
2532 ink: None,
2533 intervention_code: None,
2534 intervention_reason: None,
2535 intervention_at: None,
2536 last_output_at: None,
2537 idle_since: None,
2538 idle_threshold_secs: None,
2539 worktree_path: None,
2540 runtime: Runtime::Tmux,
2541 created_at: Utc::now(),
2542 updated_at: Utc::now(),
2543 }];
2544 let output = format_sessions(&sessions);
2545 assert!(output.contains("..."));
2546 }
2547
2548 #[test]
2549 fn test_format_sessions_worktree_indicator() {
2550 use chrono::Utc;
2551 use pulpo_common::session::SessionStatus;
2552 use uuid::Uuid;
2553
2554 let sessions = vec![Session {
2555 id: Uuid::nil(),
2556 name: "wt-task".into(),
2557 workdir: "/repo/.pulpo/worktrees/wt-task".into(),
2558 command: "claude".into(),
2559 description: None,
2560 status: SessionStatus::Active,
2561 exit_code: None,
2562 backend_session_id: None,
2563 output_snapshot: None,
2564 metadata: None,
2565 ink: None,
2566 intervention_code: None,
2567 intervention_reason: None,
2568 intervention_at: None,
2569 last_output_at: None,
2570 idle_since: None,
2571 idle_threshold_secs: None,
2572 worktree_path: Some("/repo/.pulpo/worktrees/wt-task".into()),
2573 runtime: Runtime::Tmux,
2574 created_at: Utc::now(),
2575 updated_at: Utc::now(),
2576 }];
2577 let output = format_sessions(&sessions);
2578 assert!(
2579 output.contains("[wt]"),
2580 "should show worktree indicator: {output}"
2581 );
2582 assert!(output.contains("wt-task [wt]"));
2583 }
2584
2585 #[test]
2586 fn test_format_sessions_pr_indicator() {
2587 use chrono::Utc;
2588 use pulpo_common::session::SessionStatus;
2589 use std::collections::HashMap;
2590 use uuid::Uuid;
2591
2592 let mut meta = HashMap::new();
2593 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2594 let sessions = vec![Session {
2595 id: Uuid::nil(),
2596 name: "pr-task".into(),
2597 workdir: "/tmp".into(),
2598 command: "claude".into(),
2599 description: None,
2600 status: SessionStatus::Active,
2601 exit_code: None,
2602 backend_session_id: None,
2603 output_snapshot: None,
2604 metadata: Some(meta),
2605 ink: None,
2606 intervention_code: None,
2607 intervention_reason: None,
2608 intervention_at: None,
2609 last_output_at: None,
2610 idle_since: None,
2611 idle_threshold_secs: None,
2612 worktree_path: None,
2613 runtime: Runtime::Tmux,
2614 created_at: Utc::now(),
2615 updated_at: Utc::now(),
2616 }];
2617 let output = format_sessions(&sessions);
2618 assert!(
2619 output.contains("[PR]"),
2620 "should show PR indicator: {output}"
2621 );
2622 assert!(output.contains("pr-task [PR]"));
2623 }
2624
2625 #[test]
2626 fn test_format_sessions_worktree_and_pr_indicator() {
2627 use chrono::Utc;
2628 use pulpo_common::session::SessionStatus;
2629 use std::collections::HashMap;
2630 use uuid::Uuid;
2631
2632 let mut meta = HashMap::new();
2633 meta.insert("pr_url".into(), "https://github.com/a/b/pull/1".into());
2634 let sessions = vec![Session {
2635 id: Uuid::nil(),
2636 name: "both-task".into(),
2637 workdir: "/tmp".into(),
2638 command: "claude".into(),
2639 description: None,
2640 status: SessionStatus::Active,
2641 exit_code: None,
2642 backend_session_id: None,
2643 output_snapshot: None,
2644 metadata: Some(meta),
2645 ink: None,
2646 intervention_code: None,
2647 intervention_reason: None,
2648 intervention_at: None,
2649 last_output_at: None,
2650 idle_since: None,
2651 idle_threshold_secs: None,
2652 worktree_path: Some("/repo/.pulpo/worktrees/both-task".into()),
2653 runtime: Runtime::Tmux,
2654 created_at: Utc::now(),
2655 updated_at: Utc::now(),
2656 }];
2657 let output = format_sessions(&sessions);
2658 assert!(
2659 output.contains("[wt] [PR]"),
2660 "should show both indicators: {output}"
2661 );
2662 }
2663
2664 #[test]
2665 fn test_format_sessions_no_pr_without_metadata() {
2666 use chrono::Utc;
2667 use pulpo_common::session::SessionStatus;
2668 use uuid::Uuid;
2669
2670 let sessions = vec![Session {
2671 id: Uuid::nil(),
2672 name: "no-pr".into(),
2673 workdir: "/tmp".into(),
2674 command: "claude".into(),
2675 description: None,
2676 status: SessionStatus::Active,
2677 exit_code: None,
2678 backend_session_id: None,
2679 output_snapshot: None,
2680 metadata: None,
2681 ink: None,
2682 intervention_code: None,
2683 intervention_reason: None,
2684 intervention_at: None,
2685 last_output_at: None,
2686 idle_since: None,
2687 idle_threshold_secs: None,
2688 worktree_path: None,
2689 runtime: Runtime::Tmux,
2690 created_at: Utc::now(),
2691 updated_at: Utc::now(),
2692 }];
2693 let output = format_sessions(&sessions);
2694 assert!(
2695 !output.contains("[PR]"),
2696 "should not show PR indicator: {output}"
2697 );
2698 }
2699
2700 #[test]
2701 fn test_format_nodes() {
2702 use pulpo_common::node::NodeInfo;
2703 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2704
2705 let resp = PeersResponse {
2706 local: NodeInfo {
2707 name: "mac-mini".into(),
2708 hostname: "h".into(),
2709 os: "macos".into(),
2710 arch: "arm64".into(),
2711 cpus: 8,
2712 memory_mb: 16384,
2713 gpu: None,
2714 },
2715 peers: vec![PeerInfo {
2716 name: "win-pc".into(),
2717 address: "win-pc:7433".into(),
2718 status: PeerStatus::Online,
2719 node_info: None,
2720 session_count: Some(3),
2721 source: PeerSource::Configured,
2722 }],
2723 };
2724 let output = format_nodes(&resp);
2725 assert!(output.contains("mac-mini"));
2726 assert!(output.contains("(local)"));
2727 assert!(output.contains("win-pc"));
2728 assert!(output.contains('3'));
2729 }
2730
2731 #[test]
2732 fn test_format_nodes_no_session_count() {
2733 use pulpo_common::node::NodeInfo;
2734 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2735
2736 let resp = PeersResponse {
2737 local: NodeInfo {
2738 name: "local".into(),
2739 hostname: "h".into(),
2740 os: "linux".into(),
2741 arch: "x86_64".into(),
2742 cpus: 4,
2743 memory_mb: 8192,
2744 gpu: None,
2745 },
2746 peers: vec![PeerInfo {
2747 name: "peer".into(),
2748 address: "peer:7433".into(),
2749 status: PeerStatus::Offline,
2750 node_info: None,
2751 session_count: None,
2752 source: PeerSource::Configured,
2753 }],
2754 };
2755 let output = format_nodes(&resp);
2756 assert!(output.contains("offline"));
2757 let lines: Vec<&str> = output.lines().collect();
2759 assert!(lines[2].contains('-'));
2760 }
2761
2762 #[tokio::test]
2763 async fn test_execute_resume_connection_refused() {
2764 let cli = Cli {
2765 node: "localhost:1".into(),
2766 token: None,
2767 command: Some(Commands::Resume {
2768 name: "test".into(),
2769 }),
2770 path: None,
2771 };
2772 let result = execute(&cli).await;
2773 let err = result.unwrap_err().to_string();
2774 assert!(err.contains("Could not connect to pulpod"));
2775 }
2776
2777 #[tokio::test]
2778 async fn test_execute_spawn_connection_refused() {
2779 let cli = Cli {
2780 node: "localhost:1".into(),
2781 token: None,
2782 command: Some(Commands::Spawn {
2783 name: Some("test".into()),
2784 workdir: Some("/tmp".into()),
2785 ink: None,
2786 description: None,
2787 detach: true,
2788 idle_threshold: None,
2789 auto: false,
2790 worktree: false,
2791 runtime: None,
2792 secret: vec![],
2793 command: vec!["test".into()],
2794 }),
2795 path: None,
2796 };
2797 let result = execute(&cli).await;
2798 let err = result.unwrap_err().to_string();
2799 assert!(err.contains("Could not connect to pulpod"));
2800 }
2801
2802 #[tokio::test]
2803 async fn test_execute_kill_connection_refused() {
2804 let cli = Cli {
2805 node: "localhost:1".into(),
2806 token: None,
2807 command: Some(Commands::Kill {
2808 name: "test".into(),
2809 }),
2810 path: None,
2811 };
2812 let result = execute(&cli).await;
2813 let err = result.unwrap_err().to_string();
2814 assert!(err.contains("Could not connect to pulpod"));
2815 }
2816
2817 #[tokio::test]
2818 async fn test_execute_delete_connection_refused() {
2819 let cli = Cli {
2820 node: "localhost:1".into(),
2821 token: None,
2822 command: Some(Commands::Delete {
2823 name: "test".into(),
2824 }),
2825 path: None,
2826 };
2827 let result = execute(&cli).await;
2828 let err = result.unwrap_err().to_string();
2829 assert!(err.contains("Could not connect to pulpod"));
2830 }
2831
2832 #[tokio::test]
2833 async fn test_execute_logs_connection_refused() {
2834 let cli = Cli {
2835 node: "localhost:1".into(),
2836 token: None,
2837 command: Some(Commands::Logs {
2838 name: "test".into(),
2839 lines: 50,
2840 follow: false,
2841 }),
2842 path: None,
2843 };
2844 let result = execute(&cli).await;
2845 let err = result.unwrap_err().to_string();
2846 assert!(err.contains("Could not connect to pulpod"));
2847 }
2848
2849 #[tokio::test]
2850 async fn test_friendly_error_connect() {
2851 let err = reqwest::Client::new()
2853 .get("http://127.0.0.1:1")
2854 .send()
2855 .await
2856 .unwrap_err();
2857 let friendly = friendly_error(&err, "test-node:1");
2858 let msg = friendly.to_string();
2859 assert!(
2860 msg.contains("Could not connect"),
2861 "Expected connect message, got: {msg}"
2862 );
2863 }
2864
2865 #[tokio::test]
2866 async fn test_friendly_error_other() {
2867 let err = reqwest::Client::new()
2869 .get("http://[::invalid::url")
2870 .send()
2871 .await
2872 .unwrap_err();
2873 let friendly = friendly_error(&err, "bad-host");
2874 let msg = friendly.to_string();
2875 assert!(
2876 msg.contains("Network error"),
2877 "Expected network error message, got: {msg}"
2878 );
2879 assert!(msg.contains("bad-host"));
2880 }
2881
2882 #[test]
2885 fn test_is_localhost_variants() {
2886 assert!(is_localhost("localhost:7433"));
2887 assert!(is_localhost("127.0.0.1:7433"));
2888 assert!(is_localhost("[::1]:7433"));
2889 assert!(is_localhost("::1"));
2890 assert!(is_localhost("localhost"));
2891 assert!(!is_localhost("mac-mini:7433"));
2892 assert!(!is_localhost("192.168.1.100:7433"));
2893 }
2894
2895 #[test]
2896 fn test_authed_get_with_token() {
2897 let client = reqwest::Client::new();
2898 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2899 .build()
2900 .unwrap();
2901 let auth = req
2902 .headers()
2903 .get("authorization")
2904 .unwrap()
2905 .to_str()
2906 .unwrap();
2907 assert_eq!(auth, "Bearer tok");
2908 }
2909
2910 #[test]
2911 fn test_authed_get_without_token() {
2912 let client = reqwest::Client::new();
2913 let req = authed_get(&client, "http://h:1/api".into(), None)
2914 .build()
2915 .unwrap();
2916 assert!(req.headers().get("authorization").is_none());
2917 }
2918
2919 #[test]
2920 fn test_authed_post_with_token() {
2921 let client = reqwest::Client::new();
2922 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2923 .build()
2924 .unwrap();
2925 let auth = req
2926 .headers()
2927 .get("authorization")
2928 .unwrap()
2929 .to_str()
2930 .unwrap();
2931 assert_eq!(auth, "Bearer secret");
2932 }
2933
2934 #[test]
2935 fn test_authed_post_without_token() {
2936 let client = reqwest::Client::new();
2937 let req = authed_post(&client, "http://h:1/api".into(), None)
2938 .build()
2939 .unwrap();
2940 assert!(req.headers().get("authorization").is_none());
2941 }
2942
2943 #[test]
2944 fn test_authed_delete_with_token() {
2945 let client = reqwest::Client::new();
2946 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2947 .build()
2948 .unwrap();
2949 let auth = req
2950 .headers()
2951 .get("authorization")
2952 .unwrap()
2953 .to_str()
2954 .unwrap();
2955 assert_eq!(auth, "Bearer del-tok");
2956 }
2957
2958 #[test]
2959 fn test_authed_delete_without_token() {
2960 let client = reqwest::Client::new();
2961 let req = authed_delete(&client, "http://h:1/api".into(), None)
2962 .build()
2963 .unwrap();
2964 assert!(req.headers().get("authorization").is_none());
2965 }
2966
2967 #[tokio::test]
2968 async fn test_resolve_token_explicit() {
2969 let client = reqwest::Client::new();
2970 let token =
2971 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2972 assert_eq!(token, Some("my-tok".into()));
2973 }
2974
2975 #[tokio::test]
2976 async fn test_resolve_token_remote_no_explicit() {
2977 let client = reqwest::Client::new();
2978 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2979 assert_eq!(token, None);
2980 }
2981
2982 #[tokio::test]
2983 async fn test_resolve_token_localhost_auto_discover() {
2984 use axum::{Json, Router, routing::get};
2985
2986 let app = Router::new().route(
2987 "/api/v1/auth/token",
2988 get(|| async {
2989 Json(AuthTokenResponse {
2990 token: "discovered".into(),
2991 })
2992 }),
2993 );
2994 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2995 let addr = listener.local_addr().unwrap();
2996 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2997
2998 let node = format!("localhost:{}", addr.port());
2999 let base = base_url(&node);
3000 let client = reqwest::Client::new();
3001 let token = resolve_token(&client, &base, &node, None).await;
3002 assert_eq!(token, Some("discovered".into()));
3003 }
3004
3005 #[tokio::test]
3006 async fn test_discover_token_empty_returns_none() {
3007 use axum::{Json, Router, routing::get};
3008
3009 let app = Router::new().route(
3010 "/api/v1/auth/token",
3011 get(|| async {
3012 Json(AuthTokenResponse {
3013 token: String::new(),
3014 })
3015 }),
3016 );
3017 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3018 let addr = listener.local_addr().unwrap();
3019 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3020
3021 let base = format!("http://127.0.0.1:{}", addr.port());
3022 let client = reqwest::Client::new();
3023 assert_eq!(discover_token(&client, &base).await, None);
3024 }
3025
3026 #[tokio::test]
3027 async fn test_discover_token_unreachable_returns_none() {
3028 let client = reqwest::Client::new();
3029 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
3030 }
3031
3032 #[test]
3033 fn test_cli_parse_with_token() {
3034 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
3035 assert_eq!(cli.token, Some("my-secret".into()));
3036 }
3037
3038 #[test]
3039 fn test_cli_parse_without_token() {
3040 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
3041 assert_eq!(cli.token, None);
3042 }
3043
3044 #[tokio::test]
3045 async fn test_execute_with_explicit_token_sends_header() {
3046 use axum::{Router, extract::Request, http::StatusCode, routing::get};
3047
3048 let app = Router::new().route(
3049 "/api/v1/sessions",
3050 get(|req: Request| async move {
3051 let auth = req
3052 .headers()
3053 .get("authorization")
3054 .and_then(|v| v.to_str().ok())
3055 .unwrap_or("");
3056 assert_eq!(auth, "Bearer test-token");
3057 (StatusCode::OK, "[]".to_owned())
3058 }),
3059 );
3060 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3061 let addr = listener.local_addr().unwrap();
3062 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3063 let node = format!("127.0.0.1:{}", addr.port());
3064
3065 let cli = Cli {
3066 node,
3067 token: Some("test-token".into()),
3068 command: Some(Commands::List { all: false }),
3069 path: None,
3070 };
3071 let result = execute(&cli).await.unwrap();
3072 assert_eq!(result, "No sessions.");
3073 }
3074
3075 #[test]
3078 fn test_cli_parse_interventions() {
3079 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
3080 assert!(matches!(
3081 &cli.command,
3082 Some(Commands::Interventions { name }) if name == "my-session"
3083 ));
3084 }
3085
3086 #[test]
3087 fn test_format_interventions_empty() {
3088 assert_eq!(format_interventions(&[]), "No intervention events.");
3089 }
3090
3091 #[test]
3092 fn test_format_interventions_with_data() {
3093 let events = vec![
3094 InterventionEventResponse {
3095 id: 1,
3096 session_id: "sess-1".into(),
3097 code: None,
3098 reason: "Memory exceeded threshold".into(),
3099 created_at: "2026-01-01T00:00:00Z".into(),
3100 },
3101 InterventionEventResponse {
3102 id: 2,
3103 session_id: "sess-1".into(),
3104 code: None,
3105 reason: "Idle for 10 minutes".into(),
3106 created_at: "2026-01-02T00:00:00Z".into(),
3107 },
3108 ];
3109 let output = format_interventions(&events);
3110 assert!(output.contains("ID"));
3111 assert!(output.contains("TIMESTAMP"));
3112 assert!(output.contains("REASON"));
3113 assert!(output.contains("Memory exceeded threshold"));
3114 assert!(output.contains("Idle for 10 minutes"));
3115 assert!(output.contains("2026-01-01T00:00:00Z"));
3116 }
3117
3118 #[tokio::test]
3119 async fn test_execute_interventions_empty() {
3120 let node = start_test_server().await;
3121 let cli = Cli {
3122 node,
3123 token: None,
3124 command: Some(Commands::Interventions {
3125 name: "my-session".into(),
3126 }),
3127 path: None,
3128 };
3129 let result = execute(&cli).await.unwrap();
3130 assert_eq!(result, "No intervention events.");
3131 }
3132
3133 #[tokio::test]
3134 async fn test_execute_interventions_with_data() {
3135 use axum::{Router, routing::get};
3136
3137 let app = Router::new().route(
3138 "/api/v1/sessions/{id}/interventions",
3139 get(|| async {
3140 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
3141 .to_owned()
3142 }),
3143 );
3144 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3145 let addr = listener.local_addr().unwrap();
3146 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3147 let node = format!("127.0.0.1:{}", addr.port());
3148
3149 let cli = Cli {
3150 node,
3151 token: None,
3152 command: Some(Commands::Interventions {
3153 name: "test".into(),
3154 }),
3155 path: None,
3156 };
3157 let result = execute(&cli).await.unwrap();
3158 assert!(result.contains("OOM"));
3159 assert!(result.contains("2026-01-01T00:00:00Z"));
3160 }
3161
3162 #[tokio::test]
3163 async fn test_execute_interventions_connection_refused() {
3164 let cli = Cli {
3165 node: "localhost:1".into(),
3166 token: None,
3167 command: Some(Commands::Interventions {
3168 name: "test".into(),
3169 }),
3170 path: None,
3171 };
3172 let result = execute(&cli).await;
3173 let err = result.unwrap_err().to_string();
3174 assert!(err.contains("Could not connect to pulpod"));
3175 }
3176
3177 #[test]
3180 fn test_build_attach_command_tmux() {
3181 let cmd = build_attach_command("my-session");
3182 assert_eq!(cmd.get_program(), "tmux");
3183 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3184 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
3185 }
3186
3187 #[test]
3188 fn test_build_attach_command_docker() {
3189 let cmd = build_attach_command("docker:pulpo-my-task");
3190 assert_eq!(cmd.get_program(), "docker");
3191 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3192 assert_eq!(args, vec!["exec", "-it", "pulpo-my-task", "/bin/sh"]);
3193 }
3194
3195 #[test]
3196 fn test_cli_parse_attach() {
3197 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
3198 assert!(matches!(
3199 &cli.command,
3200 Some(Commands::Attach { name }) if name == "my-session"
3201 ));
3202 }
3203
3204 #[test]
3205 fn test_cli_parse_attach_alias() {
3206 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
3207 assert!(matches!(
3208 &cli.command,
3209 Some(Commands::Attach { name }) if name == "my-session"
3210 ));
3211 }
3212
3213 #[tokio::test]
3214 async fn test_execute_attach_success() {
3215 let node = start_test_server().await;
3216 let cli = Cli {
3217 node,
3218 token: None,
3219 command: Some(Commands::Attach {
3220 name: "test-session".into(),
3221 }),
3222 path: None,
3223 };
3224 let result = execute(&cli).await.unwrap();
3225 assert!(result.contains("Detached from session test-session"));
3226 }
3227
3228 #[tokio::test]
3229 async fn test_execute_attach_with_backend_session_id() {
3230 use axum::{Router, routing::get};
3231 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3232 let app = Router::new().route(
3233 "/api/v1/sessions/{id}",
3234 get(move || async move { session_json.to_owned() }),
3235 );
3236 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3237 let addr = listener.local_addr().unwrap();
3238 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3239
3240 let cli = Cli {
3241 node: format!("127.0.0.1:{}", addr.port()),
3242 token: None,
3243 command: Some(Commands::Attach {
3244 name: "my-session".into(),
3245 }),
3246 path: None,
3247 };
3248 let result = execute(&cli).await.unwrap();
3249 assert!(result.contains("Detached from session my-session"));
3250 }
3251
3252 #[tokio::test]
3253 async fn test_execute_attach_connection_refused() {
3254 let cli = Cli {
3255 node: "localhost:1".into(),
3256 token: None,
3257 command: Some(Commands::Attach {
3258 name: "test-session".into(),
3259 }),
3260 path: None,
3261 };
3262 let result = execute(&cli).await;
3263 let err = result.unwrap_err().to_string();
3264 assert!(err.contains("Could not connect to pulpod"));
3265 }
3266
3267 #[tokio::test]
3268 async fn test_execute_attach_error_response() {
3269 use axum::{Router, http::StatusCode, routing::get};
3270 let app = Router::new().route(
3271 "/api/v1/sessions/{id}",
3272 get(|| async {
3273 (
3274 StatusCode::NOT_FOUND,
3275 r#"{"error":"session not found"}"#.to_owned(),
3276 )
3277 }),
3278 );
3279 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3280 let addr = listener.local_addr().unwrap();
3281 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3282
3283 let cli = Cli {
3284 node: format!("127.0.0.1:{}", addr.port()),
3285 token: None,
3286 command: Some(Commands::Attach {
3287 name: "nonexistent".into(),
3288 }),
3289 path: None,
3290 };
3291 let result = execute(&cli).await;
3292 let err = result.unwrap_err().to_string();
3293 assert!(err.contains("session not found"));
3294 }
3295
3296 #[tokio::test]
3297 async fn test_execute_attach_stale_session() {
3298 use axum::{Router, routing::get};
3299 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3300 let app = Router::new().route(
3301 "/api/v1/sessions/{id}",
3302 get(move || async move { session_json.to_owned() }),
3303 );
3304 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3305 let addr = listener.local_addr().unwrap();
3306 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3307
3308 let cli = Cli {
3309 node: format!("127.0.0.1:{}", addr.port()),
3310 token: None,
3311 command: Some(Commands::Attach {
3312 name: "stale-sess".into(),
3313 }),
3314 path: None,
3315 };
3316 let result = execute(&cli).await;
3317 let err = result.unwrap_err().to_string();
3318 assert!(err.contains("lost"));
3319 assert!(err.contains("pulpo resume"));
3320 }
3321
3322 #[tokio::test]
3323 async fn test_execute_attach_dead_session() {
3324 use axum::{Router, routing::get};
3325 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
3326 let app = Router::new().route(
3327 "/api/v1/sessions/{id}",
3328 get(move || async move { session_json.to_owned() }),
3329 );
3330 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3331 let addr = listener.local_addr().unwrap();
3332 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3333
3334 let cli = Cli {
3335 node: format!("127.0.0.1:{}", addr.port()),
3336 token: None,
3337 command: Some(Commands::Attach {
3338 name: "dead-sess".into(),
3339 }),
3340 path: None,
3341 };
3342 let result = execute(&cli).await;
3343 let err = result.unwrap_err().to_string();
3344 assert!(err.contains("killed"));
3345 assert!(err.contains("cannot attach"));
3346 }
3347
3348 #[test]
3351 fn test_cli_parse_alias_spawn() {
3352 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
3353 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
3354 }
3355
3356 #[test]
3357 fn test_cli_parse_alias_list() {
3358 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
3359 assert!(matches!(&cli.command, Some(Commands::List { all: false })));
3360 }
3361
3362 #[test]
3363 fn test_cli_parse_list_all() {
3364 let cli = Cli::try_parse_from(["pulpo", "ls", "-a"]).unwrap();
3365 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3366
3367 let cli = Cli::try_parse_from(["pulpo", "list", "--all"]).unwrap();
3368 assert!(matches!(&cli.command, Some(Commands::List { all: true })));
3369 }
3370
3371 #[test]
3372 fn test_cli_parse_alias_logs() {
3373 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
3374 assert!(matches!(
3375 &cli.command,
3376 Some(Commands::Logs { name, .. }) if name == "my-session"
3377 ));
3378 }
3379
3380 #[test]
3381 fn test_cli_parse_alias_kill() {
3382 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
3383 assert!(matches!(
3384 &cli.command,
3385 Some(Commands::Kill { name }) if name == "my-session"
3386 ));
3387 }
3388
3389 #[test]
3390 fn test_cli_parse_alias_delete() {
3391 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
3392 assert!(matches!(
3393 &cli.command,
3394 Some(Commands::Delete { name }) if name == "my-session"
3395 ));
3396 }
3397
3398 #[test]
3399 fn test_cli_parse_alias_resume() {
3400 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3401 assert!(matches!(
3402 &cli.command,
3403 Some(Commands::Resume { name }) if name == "my-session"
3404 ));
3405 }
3406
3407 #[test]
3408 fn test_cli_parse_alias_nodes() {
3409 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3410 assert!(matches!(&cli.command, Some(Commands::Nodes)));
3411 }
3412
3413 #[test]
3414 fn test_cli_parse_alias_interventions() {
3415 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3416 assert!(matches!(
3417 &cli.command,
3418 Some(Commands::Interventions { name }) if name == "my-session"
3419 ));
3420 }
3421
3422 #[test]
3423 fn test_api_error_json() {
3424 let err = api_error("{\"error\":\"session not found: foo\"}");
3425 assert_eq!(err.to_string(), "session not found: foo");
3426 }
3427
3428 #[test]
3429 fn test_api_error_plain_text() {
3430 let err = api_error("plain text error");
3431 assert_eq!(err.to_string(), "plain text error");
3432 }
3433
3434 #[test]
3437 fn test_diff_output_empty_prev() {
3438 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3439 }
3440
3441 #[test]
3442 fn test_diff_output_identical() {
3443 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3444 }
3445
3446 #[test]
3447 fn test_diff_output_new_lines_appended() {
3448 let prev = "line1\nline2";
3449 let new = "line1\nline2\nline3\nline4";
3450 assert_eq!(diff_output(prev, new), "line3\nline4");
3451 }
3452
3453 #[test]
3454 fn test_diff_output_scrolled_window() {
3455 let prev = "line1\nline2\nline3";
3457 let new = "line2\nline3\nline4";
3458 assert_eq!(diff_output(prev, new), "line4");
3459 }
3460
3461 #[test]
3462 fn test_diff_output_completely_different() {
3463 let prev = "aaa\nbbb";
3464 let new = "xxx\nyyy";
3465 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3466 }
3467
3468 #[test]
3469 fn test_diff_output_last_line_matches_but_overlap_fails() {
3470 let prev = "aaa\ncommon";
3472 let new = "zzz\ncommon\nnew_line";
3473 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3477 }
3478
3479 #[test]
3480 fn test_diff_output_new_empty() {
3481 assert_eq!(diff_output("line1", ""), "");
3482 }
3483
3484 async fn start_follow_test_server() -> String {
3489 use axum::{Router, extract::Path, extract::Query, routing::get};
3490 use std::sync::Arc;
3491 use std::sync::atomic::{AtomicUsize, Ordering};
3492
3493 let call_count = Arc::new(AtomicUsize::new(0));
3494 let output_count = call_count.clone();
3495
3496 let app = Router::new()
3497 .route(
3498 "/api/v1/sessions/{id}/output",
3499 get(
3500 move |_path: Path<String>,
3501 _query: Query<std::collections::HashMap<String, String>>| {
3502 let count = output_count.clone();
3503 async move {
3504 let n = count.fetch_add(1, Ordering::SeqCst);
3505 let output = match n {
3506 0 => "line1\nline2".to_owned(),
3507 1 => "line1\nline2\nline3".to_owned(),
3508 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
3509 };
3510 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3511 }
3512 },
3513 ),
3514 )
3515 .route(
3516 "/api/v1/sessions/{id}",
3517 get(|_path: Path<String>| async {
3518 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3519 }),
3520 );
3521
3522 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3523 let addr = listener.local_addr().unwrap();
3524 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3525 format!("http://127.0.0.1:{}", addr.port())
3526 }
3527
3528 #[tokio::test]
3529 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
3530 let base = start_follow_test_server().await;
3531 let client = reqwest::Client::new();
3532 let mut buf = Vec::new();
3533
3534 follow_logs(&client, &base, "test", 100, None, &mut buf)
3535 .await
3536 .unwrap();
3537
3538 let output = String::from_utf8(buf).unwrap();
3539 assert!(output.contains("line1"));
3541 assert!(output.contains("line2"));
3542 assert!(output.contains("line3"));
3543 assert!(output.contains("line4"));
3544 assert!(output.contains("[pulpo] Agent exited"));
3545 }
3546
3547 #[tokio::test]
3548 async fn test_execute_logs_follow_success() {
3549 let base = start_follow_test_server().await;
3550 let node = base.strip_prefix("http://").unwrap().to_owned();
3552
3553 let cli = Cli {
3554 node,
3555 token: None,
3556 command: Some(Commands::Logs {
3557 name: "test".into(),
3558 lines: 100,
3559 follow: true,
3560 }),
3561 path: None,
3562 };
3563 let result = execute(&cli).await.unwrap();
3565 assert_eq!(result, "");
3566 }
3567
3568 #[tokio::test]
3569 async fn test_execute_logs_follow_connection_refused() {
3570 let cli = Cli {
3571 node: "localhost:1".into(),
3572 token: None,
3573 command: Some(Commands::Logs {
3574 name: "test".into(),
3575 lines: 50,
3576 follow: true,
3577 }),
3578 path: None,
3579 };
3580 let result = execute(&cli).await;
3581 let err = result.unwrap_err().to_string();
3582 assert!(
3583 err.contains("Could not connect to pulpod"),
3584 "Expected friendly error, got: {err}"
3585 );
3586 }
3587
3588 #[tokio::test]
3589 async fn test_follow_logs_exits_on_dead() {
3590 use axum::{Router, extract::Path, extract::Query, routing::get};
3591
3592 let app = Router::new()
3593 .route(
3594 "/api/v1/sessions/{id}/output",
3595 get(
3596 |_path: Path<String>,
3597 _query: Query<std::collections::HashMap<String, String>>| async {
3598 r#"{"output":"some output"}"#.to_owned()
3599 },
3600 ),
3601 )
3602 .route(
3603 "/api/v1/sessions/{id}",
3604 get(|_path: Path<String>| async {
3605 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3606 }),
3607 );
3608
3609 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3610 let addr = listener.local_addr().unwrap();
3611 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3612 let base = format!("http://127.0.0.1:{}", addr.port());
3613
3614 let client = reqwest::Client::new();
3615 let mut buf = Vec::new();
3616 follow_logs(&client, &base, "test", 100, None, &mut buf)
3617 .await
3618 .unwrap();
3619
3620 let output = String::from_utf8(buf).unwrap();
3621 assert!(output.contains("some output"));
3622 }
3623
3624 #[tokio::test]
3625 async fn test_follow_logs_exits_on_stale() {
3626 use axum::{Router, extract::Path, extract::Query, routing::get};
3627
3628 let app = Router::new()
3629 .route(
3630 "/api/v1/sessions/{id}/output",
3631 get(
3632 |_path: Path<String>,
3633 _query: Query<std::collections::HashMap<String, String>>| async {
3634 r#"{"output":"stale output"}"#.to_owned()
3635 },
3636 ),
3637 )
3638 .route(
3639 "/api/v1/sessions/{id}",
3640 get(|_path: Path<String>| async {
3641 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3642 }),
3643 );
3644
3645 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3646 let addr = listener.local_addr().unwrap();
3647 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3648 let base = format!("http://127.0.0.1:{}", addr.port());
3649
3650 let client = reqwest::Client::new();
3651 let mut buf = Vec::new();
3652 follow_logs(&client, &base, "test", 100, None, &mut buf)
3653 .await
3654 .unwrap();
3655
3656 let output = String::from_utf8(buf).unwrap();
3657 assert!(output.contains("stale output"));
3658 }
3659
3660 #[tokio::test]
3661 async fn test_execute_logs_follow_non_reqwest_error() {
3662 use axum::{Router, extract::Path, extract::Query, routing::get};
3663
3664 let app = Router::new()
3666 .route(
3667 "/api/v1/sessions/{id}/output",
3668 get(
3669 |_path: Path<String>,
3670 _query: Query<std::collections::HashMap<String, String>>| async {
3671 r#"{"output":"initial"}"#.to_owned()
3672 },
3673 ),
3674 )
3675 .route(
3676 "/api/v1/sessions/{id}",
3677 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3678 );
3679
3680 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681 let addr = listener.local_addr().unwrap();
3682 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3683 let node = format!("127.0.0.1:{}", addr.port());
3684
3685 let cli = Cli {
3686 node,
3687 token: None,
3688 command: Some(Commands::Logs {
3689 name: "test".into(),
3690 lines: 100,
3691 follow: true,
3692 }),
3693 path: None,
3694 };
3695 let err = execute(&cli).await.unwrap_err();
3696 let msg = err.to_string();
3698 assert!(
3699 msg.contains("expected ident"),
3700 "Expected serde parse error, got: {msg}"
3701 );
3702 }
3703
3704 #[tokio::test]
3705 async fn test_fetch_session_status_connection_error() {
3706 let client = reqwest::Client::new();
3707 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3708 assert!(result.is_err());
3709 }
3710
3711 #[test]
3714 fn test_format_schedules_empty() {
3715 assert_eq!(format_schedules(&[]), "No schedules.");
3716 }
3717
3718 #[test]
3719 fn test_format_schedules_with_entries() {
3720 let schedules = vec![serde_json::json!({
3721 "name": "nightly",
3722 "cron": "0 3 * * *",
3723 "enabled": true,
3724 "last_run_at": null,
3725 "target_node": null
3726 })];
3727 let output = format_schedules(&schedules);
3728 assert!(output.contains("nightly"));
3729 assert!(output.contains("0 3 * * *"));
3730 assert!(output.contains("local"));
3731 assert!(output.contains("yes"));
3732 assert!(output.contains('-'));
3733 }
3734
3735 #[test]
3736 fn test_format_schedules_disabled_entry() {
3737 let schedules = vec![serde_json::json!({
3738 "name": "weekly",
3739 "cron": "0 0 * * 0",
3740 "enabled": false,
3741 "last_run_at": "2026-03-18T03:00:00Z",
3742 "target_node": "gpu-box"
3743 })];
3744 let output = format_schedules(&schedules);
3745 assert!(output.contains("weekly"));
3746 assert!(output.contains("no"));
3747 assert!(output.contains("gpu-box"));
3748 assert!(output.contains("2026-03-18T03:00"));
3749 }
3750
3751 #[test]
3752 fn test_format_schedules_header() {
3753 let schedules = vec![serde_json::json!({
3754 "name": "test",
3755 "cron": "* * * * *",
3756 "enabled": true,
3757 "last_run_at": null,
3758 "target_node": null
3759 })];
3760 let output = format_schedules(&schedules);
3761 assert!(output.contains("NAME"));
3762 assert!(output.contains("CRON"));
3763 assert!(output.contains("ENABLED"));
3764 assert!(output.contains("LAST RUN"));
3765 assert!(output.contains("NODE"));
3766 }
3767
3768 #[test]
3771 fn test_cli_parse_schedule_add() {
3772 let cli = Cli::try_parse_from([
3773 "pulpo",
3774 "schedule",
3775 "add",
3776 "nightly",
3777 "0 3 * * *",
3778 "--workdir",
3779 "/repo",
3780 "--",
3781 "claude",
3782 "-p",
3783 "review",
3784 ])
3785 .unwrap();
3786 assert!(matches!(
3787 &cli.command,
3788 Some(Commands::Schedule {
3789 action: ScheduleAction::Add { name, cron, .. }
3790 }) if name == "nightly" && cron == "0 3 * * *"
3791 ));
3792 }
3793
3794 #[test]
3795 fn test_cli_parse_schedule_add_with_node() {
3796 let cli = Cli::try_parse_from([
3797 "pulpo",
3798 "schedule",
3799 "add",
3800 "nightly",
3801 "0 3 * * *",
3802 "--workdir",
3803 "/repo",
3804 "--node",
3805 "gpu-box",
3806 "--",
3807 "claude",
3808 ])
3809 .unwrap();
3810 assert!(matches!(
3811 &cli.command,
3812 Some(Commands::Schedule {
3813 action: ScheduleAction::Add { node, .. }
3814 }) if node.as_deref() == Some("gpu-box")
3815 ));
3816 }
3817
3818 #[test]
3819 fn test_cli_parse_schedule_add_install_alias() {
3820 let cli =
3821 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3822 assert!(matches!(
3823 &cli.command,
3824 Some(Commands::Schedule {
3825 action: ScheduleAction::Add { name, .. }
3826 }) if name == "nightly"
3827 ));
3828 }
3829
3830 #[test]
3831 fn test_cli_parse_schedule_list() {
3832 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3833 assert!(matches!(
3834 &cli.command,
3835 Some(Commands::Schedule {
3836 action: ScheduleAction::List
3837 })
3838 ));
3839 }
3840
3841 #[test]
3842 fn test_cli_parse_schedule_remove() {
3843 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3844 assert!(matches!(
3845 &cli.command,
3846 Some(Commands::Schedule {
3847 action: ScheduleAction::Remove { name }
3848 }) if name == "nightly"
3849 ));
3850 }
3851
3852 #[test]
3853 fn test_cli_parse_schedule_pause() {
3854 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3855 assert!(matches!(
3856 &cli.command,
3857 Some(Commands::Schedule {
3858 action: ScheduleAction::Pause { name }
3859 }) if name == "nightly"
3860 ));
3861 }
3862
3863 #[test]
3864 fn test_cli_parse_schedule_resume() {
3865 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3866 assert!(matches!(
3867 &cli.command,
3868 Some(Commands::Schedule {
3869 action: ScheduleAction::Resume { name }
3870 }) if name == "nightly"
3871 ));
3872 }
3873
3874 #[test]
3875 fn test_cli_parse_schedule_alias() {
3876 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3877 assert!(matches!(
3878 &cli.command,
3879 Some(Commands::Schedule {
3880 action: ScheduleAction::List
3881 })
3882 ));
3883 }
3884
3885 #[test]
3886 fn test_cli_parse_schedule_list_alias() {
3887 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3888 assert!(matches!(
3889 &cli.command,
3890 Some(Commands::Schedule {
3891 action: ScheduleAction::List
3892 })
3893 ));
3894 }
3895
3896 #[test]
3897 fn test_cli_parse_schedule_remove_alias() {
3898 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3899 assert!(matches!(
3900 &cli.command,
3901 Some(Commands::Schedule {
3902 action: ScheduleAction::Remove { name }
3903 }) if name == "nightly"
3904 ));
3905 }
3906
3907 #[tokio::test]
3908 async fn test_execute_schedule_list_via_execute() {
3909 let node = start_test_server().await;
3910 let cli = Cli {
3911 node,
3912 token: None,
3913 command: Some(Commands::Schedule {
3914 action: ScheduleAction::List,
3915 }),
3916 path: None,
3917 };
3918 let result = execute(&cli).await.unwrap();
3919 #[cfg(coverage)]
3921 assert!(result.is_empty());
3922 #[cfg(not(coverage))]
3923 assert_eq!(result, "No schedules.");
3924 }
3925
3926 #[test]
3927 fn test_schedule_action_debug() {
3928 let action = ScheduleAction::List;
3929 assert_eq!(format!("{action:?}"), "List");
3930 }
3931
3932 #[test]
3933 fn test_cli_parse_send_alias() {
3934 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3935 assert!(matches!(
3936 &cli.command,
3937 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3938 ));
3939 }
3940
3941 #[test]
3942 fn test_cli_parse_spawn_no_name() {
3943 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3944 assert!(matches!(
3945 &cli.command,
3946 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3947 ));
3948 }
3949
3950 #[test]
3951 fn test_cli_parse_spawn_optional_name_with_command() {
3952 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3953 assert!(matches!(
3954 &cli.command,
3955 Some(Commands::Spawn { name, command, .. })
3956 if name.is_none() && command == &["echo", "hello"]
3957 ));
3958 }
3959
3960 #[test]
3961 fn test_cli_parse_path_shortcut() {
3962 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3963 assert!(cli.command.is_none());
3964 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3965 }
3966
3967 #[test]
3968 fn test_cli_parse_no_args() {
3969 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3970 assert!(cli.command.is_none());
3971 assert!(cli.path.is_none());
3972 }
3973
3974 #[test]
3975 fn test_derive_session_name_simple() {
3976 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3977 }
3978
3979 #[test]
3980 fn test_derive_session_name_with_special_chars() {
3981 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3982 }
3983
3984 #[test]
3985 fn test_derive_session_name_root() {
3986 assert_eq!(derive_session_name("/"), "session");
3987 }
3988
3989 #[test]
3990 fn test_derive_session_name_dots() {
3991 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3992 }
3993
3994 #[test]
3995 fn test_resolve_path_absolute() {
3996 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3997 }
3998
3999 #[test]
4000 fn test_resolve_path_relative() {
4001 let resolved = resolve_path("my-repo");
4002 assert!(resolved.ends_with("my-repo"));
4003 assert!(resolved.starts_with('/'));
4004 }
4005
4006 #[tokio::test]
4007 async fn test_execute_no_args_shows_help() {
4008 let node = start_test_server().await;
4009 let cli = Cli {
4010 node,
4011 token: None,
4012 path: None,
4013 command: None,
4014 };
4015 let result = execute(&cli).await.unwrap();
4016 assert!(
4017 result.is_empty(),
4018 "no-args should return empty string after printing help"
4019 );
4020 }
4021
4022 #[tokio::test]
4023 async fn test_execute_path_shortcut() {
4024 let node = start_test_server().await;
4025 let cli = Cli {
4026 node,
4027 token: None,
4028 path: Some("/tmp".into()),
4029 command: None,
4030 };
4031 let result = execute(&cli).await.unwrap();
4032 assert!(result.contains("Detached from session"));
4033 }
4034
4035 #[tokio::test]
4036 async fn test_deduplicate_session_name_no_conflict() {
4037 let base = "http://127.0.0.1:1";
4039 let client = reqwest::Client::new();
4040 let name = deduplicate_session_name(&client, base, "fresh", None).await;
4041 assert_eq!(name, "fresh");
4042 }
4043
4044 #[tokio::test]
4045 async fn test_deduplicate_session_name_with_conflict() {
4046 use axum::{Router, routing::get};
4047 use std::sync::atomic::{AtomicU32, Ordering};
4048
4049 let call_count = std::sync::Arc::new(AtomicU32::new(0));
4050 let counter = call_count.clone();
4051 let app = Router::new()
4052 .route(
4053 "/api/v1/sessions/{id}",
4054 get(move || {
4055 let c = counter.clone();
4056 async move {
4057 let n = c.fetch_add(1, Ordering::SeqCst);
4058 if n == 0 {
4059 (axum::http::StatusCode::OK, TEST_SESSION_JSON.to_owned())
4061 } else {
4062 (axum::http::StatusCode::NOT_FOUND, "not found".to_owned())
4064 }
4065 }
4066 }),
4067 )
4068 .route(
4069 "/api/v1/peers",
4070 get(|| async {
4071 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
4072 }),
4073 );
4074 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4075 let addr = listener.local_addr().unwrap();
4076 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4077 let base = format!("http://127.0.0.1:{}", addr.port());
4078 let client = reqwest::Client::new();
4079 let name = deduplicate_session_name(&client, &base, "repo", None).await;
4080 assert_eq!(name, "repo-2");
4081 }
4082
4083 #[test]
4086 fn test_node_needs_resolution() {
4087 assert!(!node_needs_resolution("localhost:7433"));
4088 assert!(!node_needs_resolution("mac-mini:7433"));
4089 assert!(!node_needs_resolution("10.0.0.1:7433"));
4090 assert!(!node_needs_resolution("[::1]:7433"));
4091 assert!(node_needs_resolution("mac-mini"));
4092 assert!(node_needs_resolution("linux-server"));
4093 assert!(node_needs_resolution("localhost"));
4094 }
4095
4096 #[tokio::test]
4097 async fn test_resolve_node_with_port() {
4098 let client = reqwest::Client::new();
4099 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
4100 assert_eq!(addr, "mac-mini:7433");
4101 assert!(token.is_none());
4102 }
4103
4104 #[tokio::test]
4105 async fn test_resolve_node_fallback_appends_port() {
4106 let client = reqwest::Client::new();
4109 let (addr, token) = resolve_node(&client, "unknown-host").await;
4110 assert_eq!(addr, "unknown-host:7433");
4111 assert!(token.is_none());
4112 }
4113
4114 #[cfg(not(coverage))]
4115 #[tokio::test]
4116 async fn test_resolve_node_finds_peer() {
4117 use axum::{Router, routing::get};
4118
4119 let app = Router::new()
4120 .route(
4121 "/api/v1/peers",
4122 get(|| async {
4123 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
4124 }),
4125 )
4126 .route(
4127 "/api/v1/config",
4128 get(|| async {
4129 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4130 }),
4131 );
4132
4133 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4135 return;
4136 };
4137 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4138
4139 let client = reqwest::Client::new();
4140 let (addr, token) = resolve_node(&client, "mac-mini").await;
4141 assert_eq!(addr, "10.0.0.5:7433");
4142 assert_eq!(token, Some("peer-secret".into()));
4143 }
4144
4145 #[cfg(not(coverage))]
4146 #[tokio::test]
4147 async fn test_resolve_node_peer_no_token() {
4148 use axum::{Router, routing::get};
4149
4150 let app = Router::new()
4151 .route(
4152 "/api/v1/peers",
4153 get(|| async {
4154 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4155 }),
4156 )
4157 .route(
4158 "/api/v1/config",
4159 get(|| async {
4160 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
4161 }),
4162 );
4163
4164 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
4165 return; };
4167 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4168
4169 let client = reqwest::Client::new();
4170 let (addr, token) = resolve_node(&client, "test-peer").await;
4171 assert_eq!(addr, "10.0.0.9:7433");
4172 assert!(token.is_none()); }
4174
4175 #[tokio::test]
4176 async fn test_execute_with_peer_name_resolution() {
4177 let cli = Cli {
4181 node: "nonexistent-peer".into(),
4182 token: None,
4183 command: Some(Commands::List { all: false }),
4184 path: None,
4185 };
4186 let result = execute(&cli).await;
4187 assert!(result.is_err());
4189 }
4190
4191 #[test]
4194 fn test_cli_parse_spawn_auto() {
4195 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
4196 assert!(matches!(
4197 &cli.command,
4198 Some(Commands::Spawn { auto, .. }) if *auto
4199 ));
4200 }
4201
4202 #[test]
4203 fn test_cli_parse_spawn_auto_default() {
4204 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
4205 assert!(matches!(
4206 &cli.command,
4207 Some(Commands::Spawn { auto, .. }) if !auto
4208 ));
4209 }
4210
4211 #[tokio::test]
4212 async fn test_select_best_node_coverage_stub() {
4213 let client = reqwest::Client::new();
4215 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
4218 }
4219
4220 #[cfg(not(coverage))]
4221 #[tokio::test]
4222 async fn test_select_best_node_picks_least_loaded() {
4223 use axum::{Router, routing::get};
4224
4225 let app = Router::new().route(
4226 "/api/v1/peers",
4227 get(|| async {
4228 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
4229 }),
4230 );
4231 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4232 let addr = listener.local_addr().unwrap();
4233 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4234 let base = format!("http://127.0.0.1:{}", addr.port());
4235
4236 let client = reqwest::Client::new();
4237 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4238 assert_eq!(name, "idle");
4242 assert_eq!(addr, "idle:7433");
4243 }
4244
4245 #[cfg(not(coverage))]
4246 #[tokio::test]
4247 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
4248 use axum::{Router, routing::get};
4249
4250 let app = Router::new().route(
4251 "/api/v1/peers",
4252 get(|| async {
4253 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4254 }),
4255 );
4256 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4257 let addr = listener.local_addr().unwrap();
4258 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4259 let base = format!("http://127.0.0.1:{}", addr.port());
4260
4261 let client = reqwest::Client::new();
4262 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4263 assert_eq!(name, "my-mac");
4264 assert_eq!(addr, "localhost:7433");
4265 }
4266
4267 #[cfg(not(coverage))]
4268 #[tokio::test]
4269 async fn test_select_best_node_empty_peers_falls_back_to_local() {
4270 use axum::{Router, routing::get};
4271
4272 let app = Router::new().route(
4273 "/api/v1/peers",
4274 get(|| async {
4275 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
4276 }),
4277 );
4278 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4279 let addr = listener.local_addr().unwrap();
4280 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4281 let base = format!("http://127.0.0.1:{}", addr.port());
4282
4283 let client = reqwest::Client::new();
4284 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4285 assert_eq!(name, "solo");
4286 assert_eq!(addr, "localhost:7433");
4287 }
4288
4289 #[cfg(not(coverage))]
4290 #[tokio::test]
4291 async fn test_execute_spawn_auto_selects_node() {
4292 use axum::{
4293 Router,
4294 http::StatusCode,
4295 routing::{get, post},
4296 };
4297
4298 let create_json = test_create_response_json();
4299
4300 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4302 let addr = listener.local_addr().unwrap();
4303 let node = format!("127.0.0.1:{}", addr.port());
4304 let peer_addr = node.clone();
4305
4306 let app = Router::new()
4307 .route(
4308 "/api/v1/peers",
4309 get(move || {
4310 let peer_addr = peer_addr.clone();
4311 async move {
4312 format!(
4313 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
4314 )
4315 }
4316 }),
4317 )
4318 .route(
4319 "/api/v1/sessions",
4320 post(move || async move {
4321 (StatusCode::CREATED, create_json.clone())
4322 }),
4323 );
4324
4325 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4326
4327 let cli = Cli {
4328 node,
4329 token: None,
4330 command: Some(Commands::Spawn {
4331 name: Some("test".into()),
4332 workdir: Some("/tmp/repo".into()),
4333 ink: None,
4334 description: None,
4335 detach: true,
4336 idle_threshold: None,
4337 auto: true,
4338 worktree: false,
4339 runtime: None,
4340 secret: vec![],
4341 command: vec!["echo".into(), "hello".into()],
4342 }),
4343 path: None,
4344 };
4345 let result = execute(&cli).await.unwrap();
4346 assert!(result.contains("Created session"));
4347 }
4348
4349 #[cfg(not(coverage))]
4350 #[tokio::test]
4351 async fn test_select_best_node_peer_no_session_count() {
4352 use axum::{Router, routing::get};
4353
4354 let app = Router::new().route(
4355 "/api/v1/peers",
4356 get(|| async {
4357 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
4358 }),
4359 );
4360 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
4361 let addr = listener.local_addr().unwrap();
4362 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
4363 let base = format!("http://127.0.0.1:{}", addr.port());
4364
4365 let client = reqwest::Client::new();
4366 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
4367 assert_eq!(name, "fresh");
4369 assert_eq!(addr, "fresh:7433");
4370 }
4371
4372 #[test]
4375 fn test_cli_parse_secret_set() {
4376 let cli = Cli::try_parse_from(["pulpo", "secret", "set", "MY_TOKEN", "abc123"]).unwrap();
4377 assert!(matches!(
4378 &cli.command,
4379 Some(Commands::Secret { action: SecretAction::Set { name, value, env } })
4380 if name == "MY_TOKEN" && value == "abc123" && env.is_none()
4381 ));
4382 }
4383
4384 #[test]
4385 fn test_cli_parse_secret_list() {
4386 let cli = Cli::try_parse_from(["pulpo", "secret", "list"]).unwrap();
4387 assert!(matches!(
4388 &cli.command,
4389 Some(Commands::Secret {
4390 action: SecretAction::List
4391 })
4392 ));
4393 }
4394
4395 #[test]
4396 fn test_cli_parse_secret_list_alias() {
4397 let cli = Cli::try_parse_from(["pulpo", "secret", "ls"]).unwrap();
4398 assert!(matches!(
4399 &cli.command,
4400 Some(Commands::Secret {
4401 action: SecretAction::List
4402 })
4403 ));
4404 }
4405
4406 #[test]
4407 fn test_cli_parse_secret_delete() {
4408 let cli = Cli::try_parse_from(["pulpo", "secret", "delete", "MY_TOKEN"]).unwrap();
4409 assert!(matches!(
4410 &cli.command,
4411 Some(Commands::Secret { action: SecretAction::Delete { name } })
4412 if name == "MY_TOKEN"
4413 ));
4414 }
4415
4416 #[test]
4417 fn test_cli_parse_secret_delete_alias() {
4418 let cli = Cli::try_parse_from(["pulpo", "secret", "rm", "MY_TOKEN"]).unwrap();
4419 assert!(matches!(
4420 &cli.command,
4421 Some(Commands::Secret { action: SecretAction::Delete { name } })
4422 if name == "MY_TOKEN"
4423 ));
4424 }
4425
4426 #[test]
4427 fn test_cli_parse_secret_alias() {
4428 let cli = Cli::try_parse_from(["pulpo", "sec", "list"]).unwrap();
4429 assert!(matches!(
4430 &cli.command,
4431 Some(Commands::Secret {
4432 action: SecretAction::List
4433 })
4434 ));
4435 }
4436
4437 #[test]
4438 fn test_format_secrets_empty() {
4439 let secrets: Vec<serde_json::Value> = vec![];
4440 assert_eq!(format_secrets(&secrets), "No secrets configured.");
4441 }
4442
4443 #[test]
4444 fn test_format_secrets_with_entries() {
4445 let secrets = vec![
4446 serde_json::json!({"name": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4447 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4448 ];
4449 let output = format_secrets(&secrets);
4450 assert!(output.contains("GITHUB_TOKEN"));
4451 assert!(output.contains("NPM_TOKEN"));
4452 assert!(output.contains("NAME"));
4453 assert!(output.contains("ENV"));
4454 assert!(output.contains("CREATED"));
4455 }
4456
4457 #[test]
4458 fn test_format_secrets_with_env() {
4459 let secrets = vec![
4460 serde_json::json!({"name": "GH_WORK", "env": "GITHUB_TOKEN", "created_at": "2026-03-21T12:00:00Z"}),
4461 serde_json::json!({"name": "NPM_TOKEN", "created_at": "2026-03-20T10:30:00Z"}),
4462 ];
4463 let output = format_secrets(&secrets);
4464 assert!(output.contains("GH_WORK"));
4465 assert!(output.contains("GITHUB_TOKEN"));
4466 assert!(output.contains("NPM_TOKEN"));
4467 }
4468
4469 #[test]
4470 fn test_format_secrets_short_timestamp() {
4471 let secrets = vec![serde_json::json!({"name": "KEY", "created_at": "now"})];
4472 let output = format_secrets(&secrets);
4473 assert!(output.contains("now"));
4474 }
4475
4476 #[test]
4477 fn test_format_schedules_short_last_run_at() {
4478 let schedules = vec![serde_json::json!({
4480 "name": "test",
4481 "cron": "* * * * *",
4482 "enabled": true,
4483 "last_run_at": "short",
4484 "target_node": null
4485 })];
4486 let output = format_schedules(&schedules);
4487 assert!(output.contains("short"));
4488 }
4489
4490 #[test]
4491 fn test_format_sessions_multibyte_command_truncation() {
4492 use chrono::Utc;
4493 use pulpo_common::session::SessionStatus;
4494 use uuid::Uuid;
4495
4496 let sessions = vec![Session {
4498 id: Uuid::nil(),
4499 name: "test".into(),
4500 workdir: "/tmp".into(),
4501 command: "echo '\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}\u{1F600}'".into(),
4502 description: None,
4503 status: SessionStatus::Active,
4504 exit_code: None,
4505 backend_session_id: None,
4506 output_snapshot: None,
4507 metadata: None,
4508 ink: None,
4509 intervention_code: None,
4510 intervention_reason: None,
4511 intervention_at: None,
4512 last_output_at: None,
4513 idle_since: None,
4514 idle_threshold_secs: None,
4515 worktree_path: None,
4516 runtime: Runtime::Tmux,
4517 created_at: Utc::now(),
4518 updated_at: Utc::now(),
4519 }];
4520 let output = format_sessions(&sessions);
4521 assert!(output.contains("..."));
4522 }
4523}