1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
5 PeersResponse,
6};
7use pulpo_common::session::Session;
8
9#[derive(Parser, Debug)]
10#[command(
11 name = "pulpo",
12 about = "Manage agent sessions across your machines",
13 version = env!("PULPO_VERSION"),
14 args_conflicts_with_subcommands = true
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Option<Commands>,
27
28 #[arg(value_name = "PATH")]
30 pub path: Option<String>,
31}
32
33#[derive(Subcommand, Debug)]
34#[allow(clippy::large_enum_variant)]
35pub enum Commands {
36 #[command(visible_alias = "a")]
38 Attach {
39 name: String,
41 },
42
43 #[command(visible_alias = "i", visible_alias = "send")]
45 Input {
46 name: String,
48 text: Option<String>,
50 },
51
52 #[command(visible_alias = "s")]
54 Spawn {
55 name: Option<String>,
57
58 #[arg(long)]
60 workdir: Option<String>,
61
62 #[arg(long)]
64 ink: Option<String>,
65
66 #[arg(long)]
68 description: Option<String>,
69
70 #[arg(short, long)]
72 detach: bool,
73
74 #[arg(long)]
76 idle_threshold: Option<u32>,
77
78 #[arg(long)]
80 auto: bool,
81
82 #[arg(long)]
84 worktree: bool,
85
86 #[arg(long)]
88 sandbox: bool,
89
90 #[arg(last = true)]
92 command: Vec<String>,
93 },
94
95 #[command(visible_alias = "ls")]
97 List,
98
99 #[command(visible_alias = "l")]
101 Logs {
102 name: String,
104
105 #[arg(long, default_value = "100")]
107 lines: usize,
108
109 #[arg(short, long)]
111 follow: bool,
112 },
113
114 #[command(visible_alias = "k")]
116 Kill {
117 name: String,
119 },
120
121 #[command(visible_alias = "rm")]
123 Delete {
124 name: String,
126 },
127
128 #[command(visible_alias = "r")]
130 Resume {
131 name: String,
133 },
134
135 #[command(visible_alias = "n")]
137 Nodes,
138
139 #[command(visible_alias = "iv")]
141 Interventions {
142 name: String,
144 },
145
146 Ui,
148
149 #[command(visible_alias = "sched")]
151 Schedule {
152 #[command(subcommand)]
153 action: ScheduleAction,
154 },
155}
156
157#[derive(Subcommand, Debug)]
158pub enum ScheduleAction {
159 #[command(alias = "install")]
161 Add {
162 name: String,
164 cron: String,
166 #[arg(long)]
168 workdir: Option<String>,
169 #[arg(long)]
171 node: Option<String>,
172 #[arg(long)]
174 ink: Option<String>,
175 #[arg(long)]
177 description: Option<String>,
178 #[arg(last = true)]
180 command: Vec<String>,
181 },
182 #[command(alias = "ls")]
184 List,
185 #[command(alias = "rm")]
187 Remove {
188 name: String,
190 },
191 Pause {
193 name: String,
195 },
196 Resume {
198 name: String,
200 },
201}
202
203const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
205
206fn resolve_path(path: &str) -> String {
208 let p = std::path::Path::new(path);
209 if p.is_absolute() {
210 path.to_owned()
211 } else {
212 std::env::current_dir().map_or_else(
213 |_| path.to_owned(),
214 |cwd| cwd.join(p).to_string_lossy().into_owned(),
215 )
216 }
217}
218
219fn derive_session_name(path: &str) -> String {
221 let basename = std::path::Path::new(path)
222 .file_name()
223 .and_then(|n| n.to_str())
224 .unwrap_or("session");
225 let kebab: String = basename
227 .chars()
228 .map(|c| {
229 if c.is_ascii_alphanumeric() {
230 c.to_ascii_lowercase()
231 } else {
232 '-'
233 }
234 })
235 .collect();
236 let mut result = String::new();
238 for c in kebab.chars() {
239 if c == '-' && result.ends_with('-') {
240 continue;
241 }
242 result.push(c);
243 }
244 let result = result.trim_matches('-').to_owned();
245 if result.is_empty() {
246 "session".to_owned()
247 } else {
248 result
249 }
250}
251
252async fn deduplicate_session_name(
254 client: &reqwest::Client,
255 base: &str,
256 name: &str,
257 token: Option<&str>,
258) -> String {
259 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
261 .send()
262 .await;
263 match resp {
264 Ok(r) if r.status().is_success() => {
265 for i in 2..=99 {
267 let candidate = format!("{name}-{i}");
268 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
269 .send()
270 .await;
271 match resp {
272 Ok(r) if r.status().is_success() => {}
273 _ => return candidate,
274 }
275 }
276 format!("{name}-100")
277 }
278 _ => name.to_owned(),
279 }
280}
281
282pub fn base_url(node: &str) -> String {
284 format!("http://{node}")
285}
286
287#[derive(serde::Deserialize)]
289struct OutputResponse {
290 output: String,
291}
292
293fn format_sessions(sessions: &[Session]) -> String {
295 if sessions.is_empty() {
296 return "No sessions.".into();
297 }
298 let mut lines = vec![format!("{:<20} {:<12} {}", "NAME", "STATUS", "COMMAND")];
299 for s in sessions {
300 let cmd_display = if s.command.len() > 50 {
301 format!("{}...", &s.command[..47])
302 } else {
303 s.command.clone()
304 };
305 lines.push(format!("{:<20} {:<12} {}", s.name, s.status, cmd_display));
306 }
307 lines.join("\n")
308}
309
310fn format_nodes(resp: &PeersResponse) -> String {
312 let mut lines = vec![format!(
313 "{:<20} {:<25} {:<10} {}",
314 "NAME", "ADDRESS", "STATUS", "SESSIONS"
315 )];
316 lines.push(format!(
317 "{:<20} {:<25} {:<10} {}",
318 resp.local.name, "(local)", "online", "-"
319 ));
320 for p in &resp.peers {
321 let sessions = p
322 .session_count
323 .map_or_else(|| "-".into(), |c| c.to_string());
324 lines.push(format!(
325 "{:<20} {:<25} {:<10} {}",
326 p.name, p.address, p.status, sessions
327 ));
328 }
329 lines.join("\n")
330}
331
332fn format_interventions(events: &[InterventionEventResponse]) -> String {
334 if events.is_empty() {
335 return "No intervention events.".into();
336 }
337 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
338 for e in events {
339 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
340 }
341 lines.join("\n")
342}
343
344#[cfg_attr(coverage, allow(dead_code))]
346fn build_open_command(url: &str) -> std::process::Command {
347 #[cfg(target_os = "macos")]
348 {
349 let mut cmd = std::process::Command::new("open");
350 cmd.arg(url);
351 cmd
352 }
353 #[cfg(target_os = "linux")]
354 {
355 let mut cmd = std::process::Command::new("xdg-open");
356 cmd.arg(url);
357 cmd
358 }
359 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
360 {
361 let mut cmd = std::process::Command::new("xdg-open");
363 cmd.arg(url);
364 cmd
365 }
366}
367
368#[cfg(not(coverage))]
370fn open_browser(url: &str) -> Result<()> {
371 build_open_command(url).status()?;
372 Ok(())
373}
374
375#[cfg(coverage)]
377fn open_browser(_url: &str) -> Result<()> {
378 Ok(())
379}
380
381#[cfg_attr(coverage, allow(dead_code))]
384fn build_attach_command(backend_session_id: &str) -> std::process::Command {
385 let mut cmd = std::process::Command::new("tmux");
386 cmd.args(["attach-session", "-t", backend_session_id]);
387 cmd
388}
389
390#[cfg(not(any(test, coverage)))]
392fn attach_session(backend_session_id: &str) -> Result<()> {
393 let status = build_attach_command(backend_session_id).status()?;
394 if !status.success() {
395 anyhow::bail!("attach failed with {status}");
396 }
397 Ok(())
398}
399
400#[cfg(any(test, coverage))]
402#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
403fn attach_session(_backend_session_id: &str) -> Result<()> {
404 Ok(())
405}
406
407fn api_error(text: &str) -> anyhow::Error {
409 serde_json::from_str::<serde_json::Value>(text)
410 .ok()
411 .and_then(|v| v["error"].as_str().map(String::from))
412 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
413}
414
415async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
417 if resp.status().is_success() {
418 Ok(resp.text().await?)
419 } else {
420 let text = resp.text().await?;
421 Err(api_error(&text))
422 }
423}
424
425fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
427 if err.is_connect() {
428 anyhow::anyhow!(
429 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
430 )
431 } else {
432 anyhow::anyhow!("Network error connecting to {node}: {err}")
433 }
434}
435
436fn is_localhost(node: &str) -> bool {
438 let host = node.split(':').next().unwrap_or(node);
439 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
440}
441
442async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
444 let resp = client
445 .get(format!("{base}/api/v1/auth/token"))
446 .send()
447 .await
448 .ok()?;
449 let body: AuthTokenResponse = resp.json().await.ok()?;
450 if body.token.is_empty() {
451 None
452 } else {
453 Some(body.token)
454 }
455}
456
457async fn resolve_token(
459 client: &reqwest::Client,
460 base: &str,
461 node: &str,
462 explicit: Option<&str>,
463) -> Option<String> {
464 if let Some(t) = explicit {
465 return Some(t.to_owned());
466 }
467 if is_localhost(node) {
468 return discover_token(client, base).await;
469 }
470 None
471}
472
473fn node_needs_resolution(node: &str) -> bool {
475 !node.contains(':')
476}
477
478#[cfg(not(coverage))]
485async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
486 if !node_needs_resolution(node) {
488 return (node.to_owned(), None);
489 }
490
491 let local_base = "http://localhost:7433";
493 let mut resolved_address: Option<String> = None;
494
495 if let Ok(resp) = client
496 .get(format!("{local_base}/api/v1/peers"))
497 .send()
498 .await
499 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
500 {
501 for peer in &peers_resp.peers {
502 if peer.name == node {
503 resolved_address = Some(peer.address.clone());
504 break;
505 }
506 }
507 }
508
509 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
510
511 let peer_token = if let Ok(resp) = client
513 .get(format!("{local_base}/api/v1/config"))
514 .send()
515 .await
516 && let Ok(config) = resp.json::<ConfigResponse>().await
517 && let Some(entry) = config.peers.get(node)
518 {
519 entry.token().map(String::from)
520 } else {
521 None
522 };
523
524 (address, peer_token)
525}
526
527#[cfg(coverage)]
529async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
530 if node_needs_resolution(node) {
531 (format!("{node}:7433"), None)
532 } else {
533 (node.to_owned(), None)
534 }
535}
536
537fn authed_get(
539 client: &reqwest::Client,
540 url: String,
541 token: Option<&str>,
542) -> reqwest::RequestBuilder {
543 let req = client.get(url);
544 if let Some(t) = token {
545 req.bearer_auth(t)
546 } else {
547 req
548 }
549}
550
551fn authed_post(
553 client: &reqwest::Client,
554 url: String,
555 token: Option<&str>,
556) -> reqwest::RequestBuilder {
557 let req = client.post(url);
558 if let Some(t) = token {
559 req.bearer_auth(t)
560 } else {
561 req
562 }
563}
564
565fn authed_delete(
567 client: &reqwest::Client,
568 url: String,
569 token: Option<&str>,
570) -> reqwest::RequestBuilder {
571 let req = client.delete(url);
572 if let Some(t) = token {
573 req.bearer_auth(t)
574 } else {
575 req
576 }
577}
578
579#[cfg_attr(coverage, allow(dead_code))]
581fn authed_put(
582 client: &reqwest::Client,
583 url: String,
584 token: Option<&str>,
585) -> reqwest::RequestBuilder {
586 let req = client.put(url);
587 if let Some(t) = token {
588 req.bearer_auth(t)
589 } else {
590 req
591 }
592}
593
594async fn fetch_output(
596 client: &reqwest::Client,
597 base: &str,
598 name: &str,
599 lines: usize,
600 token: Option<&str>,
601) -> Result<String> {
602 let resp = authed_get(
603 client,
604 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
605 token,
606 )
607 .send()
608 .await?;
609 let text = ok_or_api_error(resp).await?;
610 let output: OutputResponse = serde_json::from_str(&text)?;
611 Ok(output.output)
612}
613
614async fn fetch_session_status(
616 client: &reqwest::Client,
617 base: &str,
618 name: &str,
619 token: Option<&str>,
620) -> Result<String> {
621 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
622 .send()
623 .await?;
624 let text = ok_or_api_error(resp).await?;
625 let session: Session = serde_json::from_str(&text)?;
626 Ok(session.status.to_string())
627}
628
629fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
636 if prev.is_empty() {
637 return new;
638 }
639
640 let prev_lines: Vec<&str> = prev.lines().collect();
641 let new_lines: Vec<&str> = new.lines().collect();
642
643 if new_lines.is_empty() {
644 return "";
645 }
646
647 let last_prev = prev_lines[prev_lines.len() - 1];
649
650 for i in (0..new_lines.len()).rev() {
652 if new_lines[i] == last_prev {
653 let overlap_len = prev_lines.len().min(i + 1);
655 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
656 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
657 if prev_tail == new_overlap {
658 if i + 1 < new_lines.len() {
659 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
661 return new.get(consumed.min(new.len())..).unwrap_or("");
662 }
663 return "";
664 }
665 }
666 }
667
668 new
670}
671
672async fn follow_logs(
674 client: &reqwest::Client,
675 base: &str,
676 name: &str,
677 lines: usize,
678 token: Option<&str>,
679 writer: &mut (dyn std::io::Write + Send),
680) -> Result<()> {
681 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
682 write!(writer, "{prev_output}")?;
683
684 let mut unchanged_ticks: u32 = 0;
685
686 loop {
687 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
688
689 let new_output = fetch_output(client, base, name, lines, token).await?;
691
692 let diff = diff_output(&prev_output, &new_output);
693 if diff.is_empty() {
694 unchanged_ticks += 1;
695 } else {
696 write!(writer, "{diff}")?;
697 unchanged_ticks = 0;
698 }
699
700 if new_output.contains(AGENT_EXIT_MARKER) {
702 break;
703 }
704
705 prev_output = new_output;
706
707 if unchanged_ticks >= 3 {
709 let status = fetch_session_status(client, base, name, token).await?;
710 let is_terminal = status == "ready" || status == "killed" || status == "lost";
711 if is_terminal {
712 break;
713 }
714 }
715 }
716 Ok(())
717}
718
719#[cfg(not(coverage))]
723async fn execute_schedule(
724 client: &reqwest::Client,
725 action: &ScheduleAction,
726 base: &str,
727 token: Option<&str>,
728) -> Result<String> {
729 match action {
730 ScheduleAction::Add {
731 name,
732 cron,
733 workdir,
734 node,
735 ink,
736 description,
737 command,
738 } => {
739 let cmd = if command.is_empty() {
740 None
741 } else {
742 Some(command.join(" "))
743 };
744 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
745 std::env::current_dir()
746 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
747 });
748 let mut body = serde_json::json!({
749 "name": name,
750 "cron": cron,
751 "workdir": resolved_workdir,
752 });
753 if let Some(c) = &cmd {
754 body["command"] = serde_json::json!(c);
755 }
756 if let Some(n) = node {
757 body["target_node"] = serde_json::json!(n);
758 }
759 if let Some(i) = ink {
760 body["ink"] = serde_json::json!(i);
761 }
762 if let Some(d) = description {
763 body["description"] = serde_json::json!(d);
764 }
765 let resp = authed_post(client, format!("{base}/api/v1/schedules"), token)
766 .json(&body)
767 .send()
768 .await?;
769 ok_or_api_error(resp).await?;
770 Ok(format!("Created schedule \"{name}\""))
771 }
772 ScheduleAction::List => {
773 let resp = authed_get(client, format!("{base}/api/v1/schedules"), token)
774 .send()
775 .await?;
776 let text = ok_or_api_error(resp).await?;
777 let schedules: Vec<serde_json::Value> = serde_json::from_str(&text)?;
778 Ok(format_schedules(&schedules))
779 }
780 ScheduleAction::Remove { name } => {
781 let resp = authed_delete(client, format!("{base}/api/v1/schedules/{name}"), token)
782 .send()
783 .await?;
784 ok_or_api_error(resp).await?;
785 Ok(format!("Removed schedule \"{name}\""))
786 }
787 ScheduleAction::Pause { name } => {
788 let body = serde_json::json!({ "enabled": false });
789 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
790 .json(&body)
791 .send()
792 .await?;
793 ok_or_api_error(resp).await?;
794 Ok(format!("Paused schedule \"{name}\""))
795 }
796 ScheduleAction::Resume { name } => {
797 let body = serde_json::json!({ "enabled": true });
798 let resp = authed_put(client, format!("{base}/api/v1/schedules/{name}"), token)
799 .json(&body)
800 .send()
801 .await?;
802 ok_or_api_error(resp).await?;
803 Ok(format!("Resumed schedule \"{name}\""))
804 }
805 }
806}
807
808#[cfg(coverage)]
810#[allow(clippy::unnecessary_wraps)]
811async fn execute_schedule(
812 _client: &reqwest::Client,
813 _action: &ScheduleAction,
814 _base: &str,
815 _token: Option<&str>,
816) -> Result<String> {
817 Ok(String::new())
818}
819
820#[cfg_attr(coverage, allow(dead_code))]
822fn format_schedules(schedules: &[serde_json::Value]) -> String {
823 if schedules.is_empty() {
824 return "No schedules.".into();
825 }
826 let mut lines = vec![format!(
827 "{:<20} {:<18} {:<8} {:<12} {}",
828 "NAME", "CRON", "ENABLED", "LAST RUN", "NODE"
829 )];
830 for s in schedules {
831 let name = s["name"].as_str().unwrap_or("?");
832 let cron = s["cron"].as_str().unwrap_or("?");
833 let enabled = if s["enabled"].as_bool().unwrap_or(true) {
834 "yes"
835 } else {
836 "no"
837 };
838 let last_run = s["last_run_at"].as_str().map_or("-", |t| &t[..16]);
839 let node = s["target_node"].as_str().unwrap_or("local");
840 lines.push(format!(
841 "{name:<20} {cron:<18} {enabled:<8} {last_run:<12} {node}"
842 ));
843 }
844 lines.join("\n")
845}
846
847#[cfg(not(coverage))]
851async fn select_best_node(
852 client: &reqwest::Client,
853 base: &str,
854 token: Option<&str>,
855) -> Result<(String, String)> {
856 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
857 .send()
858 .await?;
859 let text = ok_or_api_error(resp).await?;
860 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
861
862 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
866 if peer.status != pulpo_common::peer::PeerStatus::Online {
867 continue;
868 }
869 let sessions = peer.session_count.unwrap_or(0);
870 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
871 #[allow(clippy::cast_precision_loss)]
873 let score = sessions as f64 - (mem as f64 / 1024.0);
874 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
875 best = Some((peer.address.clone(), peer.name.clone(), score));
876 }
877 }
878
879 match best {
881 Some((addr, name, _)) => Ok((addr, name)),
882 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
883 }
884}
885
886#[cfg(coverage)]
888#[allow(clippy::unnecessary_wraps)]
889async fn select_best_node(
890 _client: &reqwest::Client,
891 _base: &str,
892 _token: Option<&str>,
893) -> Result<(String, String)> {
894 Ok(("localhost:7433".into(), "local".into()))
895}
896
897#[allow(clippy::too_many_lines)]
899pub async fn execute(cli: &Cli) -> Result<String> {
900 let client = reqwest::Client::new();
901 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
902 let url = base_url(&resolved_node);
903 let node = &resolved_node;
904 let token = resolve_token(&client, &url, node, cli.token.as_deref())
905 .await
906 .or(peer_token);
907
908 if cli.command.is_none() {
910 let path = cli.path.as_deref().unwrap_or(".");
911 let resolved_workdir = resolve_path(path);
912 let base_name = derive_session_name(&resolved_workdir);
913 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
914 let body = serde_json::json!({
915 "name": name,
916 "workdir": resolved_workdir,
917 });
918 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
919 .json(&body)
920 .send()
921 .await
922 .map_err(|e| friendly_error(&e, node))?;
923 let text = ok_or_api_error(resp).await?;
924 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
925 let msg = format!(
926 "Created session \"{}\" ({})",
927 resp.session.name, resp.session.id
928 );
929 let backend_id = resp
930 .session
931 .backend_session_id
932 .as_deref()
933 .unwrap_or(&resp.session.name);
934 eprintln!("{msg}");
935 attach_session(backend_id)?;
936 return Ok(format!("Detached from session \"{}\".", resp.session.name));
937 }
938
939 match cli.command.as_ref().unwrap() {
940 Commands::Attach { name } => {
941 let resp = authed_get(
943 &client,
944 format!("{url}/api/v1/sessions/{name}"),
945 token.as_deref(),
946 )
947 .send()
948 .await
949 .map_err(|e| friendly_error(&e, node))?;
950 let text = ok_or_api_error(resp).await?;
951 let session: Session = serde_json::from_str(&text)?;
952 match session.status.to_string().as_str() {
953 "lost" => {
954 anyhow::bail!(
955 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
956 );
957 }
958 "killed" => {
959 anyhow::bail!(
960 "Session \"{name}\" is {} — cannot attach to a killed session.",
961 session.status
962 );
963 }
964 _ => {}
965 }
966 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
967 attach_session(&backend_id)?;
968 Ok(format!("Detached from session {name}."))
969 }
970 Commands::Input { name, text } => {
971 let input_text = text.as_deref().unwrap_or("\n");
972 let body = serde_json::json!({ "text": input_text });
973 let resp = authed_post(
974 &client,
975 format!("{url}/api/v1/sessions/{name}/input"),
976 token.as_deref(),
977 )
978 .json(&body)
979 .send()
980 .await
981 .map_err(|e| friendly_error(&e, node))?;
982 ok_or_api_error(resp).await?;
983 Ok(format!("Sent input to session {name}."))
984 }
985 Commands::List => {
986 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
987 .send()
988 .await
989 .map_err(|e| friendly_error(&e, node))?;
990 let text = ok_or_api_error(resp).await?;
991 let sessions: Vec<Session> = serde_json::from_str(&text)?;
992 Ok(format_sessions(&sessions))
993 }
994 Commands::Nodes => {
995 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
996 .send()
997 .await
998 .map_err(|e| friendly_error(&e, node))?;
999 let text = ok_or_api_error(resp).await?;
1000 let resp: PeersResponse = serde_json::from_str(&text)?;
1001 Ok(format_nodes(&resp))
1002 }
1003 Commands::Spawn {
1004 workdir,
1005 name,
1006 ink,
1007 description,
1008 detach,
1009 idle_threshold,
1010 auto,
1011 worktree,
1012 sandbox,
1013 command,
1014 } => {
1015 let cmd = if command.is_empty() {
1016 None
1017 } else {
1018 Some(command.join(" "))
1019 };
1020 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1022 std::env::current_dir()
1023 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1024 });
1025 let resolved_name = if let Some(n) = name {
1027 n.clone()
1028 } else {
1029 let base_name = derive_session_name(&resolved_workdir);
1030 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1031 };
1032 let mut body = serde_json::json!({
1033 "name": resolved_name,
1034 "workdir": resolved_workdir,
1035 });
1036 if let Some(c) = &cmd {
1037 body["command"] = serde_json::json!(c);
1038 }
1039 if let Some(i) = ink {
1040 body["ink"] = serde_json::json!(i);
1041 }
1042 if let Some(d) = description {
1043 body["description"] = serde_json::json!(d);
1044 }
1045 if let Some(t) = idle_threshold {
1046 body["idle_threshold_secs"] = serde_json::json!(t);
1047 }
1048 if *worktree {
1049 body["worktree"] = serde_json::json!(true);
1050 }
1051 if *sandbox {
1052 body["sandbox"] = serde_json::json!(true);
1053 }
1054 let spawn_url = if *auto {
1055 let (auto_addr, auto_name) =
1056 select_best_node(&client, &url, token.as_deref()).await?;
1057 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1058 base_url(&auto_addr)
1059 } else {
1060 url.clone()
1061 };
1062 let resp = authed_post(
1063 &client,
1064 format!("{spawn_url}/api/v1/sessions"),
1065 token.as_deref(),
1066 )
1067 .json(&body)
1068 .send()
1069 .await
1070 .map_err(|e| friendly_error(&e, node))?;
1071 let text = ok_or_api_error(resp).await?;
1072 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1073 let msg = format!(
1074 "Created session \"{}\" ({})",
1075 resp.session.name, resp.session.id
1076 );
1077 if !detach {
1078 let backend_id = resp
1079 .session
1080 .backend_session_id
1081 .as_deref()
1082 .unwrap_or(&resp.session.name);
1083 eprintln!("{msg}");
1084 attach_session(backend_id)?;
1085 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1086 }
1087 Ok(msg)
1088 }
1089 Commands::Kill { name } => {
1090 let resp = authed_post(
1091 &client,
1092 format!("{url}/api/v1/sessions/{name}/kill"),
1093 token.as_deref(),
1094 )
1095 .send()
1096 .await
1097 .map_err(|e| friendly_error(&e, node))?;
1098 ok_or_api_error(resp).await?;
1099 Ok(format!("Session {name} killed."))
1100 }
1101 Commands::Delete { name } => {
1102 let resp = authed_delete(
1103 &client,
1104 format!("{url}/api/v1/sessions/{name}"),
1105 token.as_deref(),
1106 )
1107 .send()
1108 .await
1109 .map_err(|e| friendly_error(&e, node))?;
1110 ok_or_api_error(resp).await?;
1111 Ok(format!("Session {name} deleted."))
1112 }
1113 Commands::Logs {
1114 name,
1115 lines,
1116 follow,
1117 } => {
1118 if *follow {
1119 let mut stdout = std::io::stdout();
1120 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1121 .await
1122 .map_err(|e| {
1123 match e.downcast::<reqwest::Error>() {
1125 Ok(re) => friendly_error(&re, node),
1126 Err(other) => other,
1127 }
1128 })?;
1129 Ok(String::new())
1130 } else {
1131 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1132 .await
1133 .map_err(|e| match e.downcast::<reqwest::Error>() {
1134 Ok(re) => friendly_error(&re, node),
1135 Err(other) => other,
1136 })?;
1137 Ok(output)
1138 }
1139 }
1140 Commands::Interventions { name } => {
1141 let resp = authed_get(
1142 &client,
1143 format!("{url}/api/v1/sessions/{name}/interventions"),
1144 token.as_deref(),
1145 )
1146 .send()
1147 .await
1148 .map_err(|e| friendly_error(&e, node))?;
1149 let text = ok_or_api_error(resp).await?;
1150 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1151 Ok(format_interventions(&events))
1152 }
1153 Commands::Ui => {
1154 let dashboard = base_url(node);
1155 open_browser(&dashboard)?;
1156 Ok(format!("Opening {dashboard}"))
1157 }
1158 Commands::Resume { name } => {
1159 let resp = authed_post(
1160 &client,
1161 format!("{url}/api/v1/sessions/{name}/resume"),
1162 token.as_deref(),
1163 )
1164 .send()
1165 .await
1166 .map_err(|e| friendly_error(&e, node))?;
1167 let text = ok_or_api_error(resp).await?;
1168 let session: Session = serde_json::from_str(&text)?;
1169 let backend_id = session
1170 .backend_session_id
1171 .as_deref()
1172 .unwrap_or(&session.name);
1173 eprintln!("Resumed session \"{}\"", session.name);
1174 attach_session(backend_id)?;
1175 Ok(format!("Detached from session \"{}\".", session.name))
1176 }
1177 Commands::Schedule { action } => execute_schedule(&client, action, &url, token.as_deref())
1178 .await
1179 .map_err(|e| match e.downcast::<reqwest::Error>() {
1180 Ok(re) => friendly_error(&re, node),
1181 Err(other) => other,
1182 }),
1183 }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188 use super::*;
1189
1190 #[test]
1191 fn test_base_url() {
1192 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1193 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1194 }
1195
1196 #[test]
1197 fn test_cli_parse_list() {
1198 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1199 assert_eq!(cli.node, "localhost:7433");
1200 assert!(matches!(cli.command, Some(Commands::List)));
1201 }
1202
1203 #[test]
1204 fn test_cli_parse_nodes() {
1205 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1206 assert!(matches!(cli.command, Some(Commands::Nodes)));
1207 }
1208
1209 #[test]
1210 fn test_cli_parse_ui() {
1211 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1212 assert!(matches!(cli.command, Some(Commands::Ui)));
1213 }
1214
1215 #[test]
1216 fn test_cli_parse_ui_custom_node() {
1217 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1218 assert_eq!(cli.node, "mac-mini:7433");
1220 assert_eq!(cli.path.as_deref(), Some("ui"));
1221 }
1222
1223 #[test]
1224 fn test_build_open_command() {
1225 let cmd = build_open_command("http://localhost:7433");
1226 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1227 assert_eq!(args, vec!["http://localhost:7433"]);
1228 #[cfg(target_os = "macos")]
1229 assert_eq!(cmd.get_program(), "open");
1230 #[cfg(target_os = "linux")]
1231 assert_eq!(cmd.get_program(), "xdg-open");
1232 }
1233
1234 #[test]
1235 fn test_cli_parse_spawn() {
1236 let cli = Cli::try_parse_from([
1237 "pulpo",
1238 "spawn",
1239 "my-task",
1240 "--workdir",
1241 "/tmp/repo",
1242 "--",
1243 "claude",
1244 "-p",
1245 "Fix the bug",
1246 ])
1247 .unwrap();
1248 assert!(matches!(
1249 &cli.command,
1250 Some(Commands::Spawn { name, workdir, command, .. })
1251 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1252 && command == &["claude", "-p", "Fix the bug"]
1253 ));
1254 }
1255
1256 #[test]
1257 fn test_cli_parse_spawn_with_ink() {
1258 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1259 assert!(matches!(
1260 &cli.command,
1261 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1262 ));
1263 }
1264
1265 #[test]
1266 fn test_cli_parse_spawn_with_description() {
1267 let cli =
1268 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1269 .unwrap();
1270 assert!(matches!(
1271 &cli.command,
1272 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1273 ));
1274 }
1275
1276 #[test]
1277 fn test_cli_parse_spawn_name_positional() {
1278 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1279 assert!(matches!(
1280 &cli.command,
1281 Some(Commands::Spawn { name, command, .. })
1282 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1283 ));
1284 }
1285
1286 #[test]
1287 fn test_cli_parse_spawn_no_command() {
1288 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1289 assert!(matches!(
1290 &cli.command,
1291 Some(Commands::Spawn { command, .. }) if command.is_empty()
1292 ));
1293 }
1294
1295 #[test]
1296 fn test_cli_parse_spawn_idle_threshold() {
1297 let cli =
1298 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1299 assert!(matches!(
1300 &cli.command,
1301 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1302 ));
1303 }
1304
1305 #[test]
1306 fn test_cli_parse_spawn_idle_threshold_60() {
1307 let cli =
1308 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1309 assert!(matches!(
1310 &cli.command,
1311 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1312 ));
1313 }
1314
1315 #[test]
1316 fn test_cli_parse_spawn_worktree() {
1317 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--worktree"]).unwrap();
1318 assert!(matches!(
1319 &cli.command,
1320 Some(Commands::Spawn { worktree, .. }) if *worktree
1321 ));
1322 }
1323
1324 #[test]
1325 fn test_cli_parse_spawn_detach() {
1326 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1327 assert!(matches!(
1328 &cli.command,
1329 Some(Commands::Spawn { detach, .. }) if *detach
1330 ));
1331 }
1332
1333 #[test]
1334 fn test_cli_parse_spawn_detach_short() {
1335 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1336 assert!(matches!(
1337 &cli.command,
1338 Some(Commands::Spawn { detach, .. }) if *detach
1339 ));
1340 }
1341
1342 #[test]
1343 fn test_cli_parse_spawn_detach_default() {
1344 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1345 assert!(matches!(
1346 &cli.command,
1347 Some(Commands::Spawn { detach, .. }) if !detach
1348 ));
1349 }
1350
1351 #[test]
1352 fn test_cli_parse_logs() {
1353 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1354 assert!(matches!(
1355 &cli.command,
1356 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1357 ));
1358 }
1359
1360 #[test]
1361 fn test_cli_parse_logs_with_lines() {
1362 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1363 assert!(matches!(
1364 &cli.command,
1365 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1366 ));
1367 }
1368
1369 #[test]
1370 fn test_cli_parse_logs_follow() {
1371 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1372 assert!(matches!(
1373 &cli.command,
1374 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1375 ));
1376 }
1377
1378 #[test]
1379 fn test_cli_parse_logs_follow_short() {
1380 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1381 assert!(matches!(
1382 &cli.command,
1383 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1384 ));
1385 }
1386
1387 #[test]
1388 fn test_cli_parse_kill() {
1389 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1390 assert!(matches!(
1391 &cli.command,
1392 Some(Commands::Kill { name }) if name == "my-session"
1393 ));
1394 }
1395
1396 #[test]
1397 fn test_cli_parse_delete() {
1398 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1399 assert!(matches!(
1400 &cli.command,
1401 Some(Commands::Delete { name }) if name == "my-session"
1402 ));
1403 }
1404
1405 #[test]
1406 fn test_cli_parse_resume() {
1407 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1408 assert!(matches!(
1409 &cli.command,
1410 Some(Commands::Resume { name }) if name == "my-session"
1411 ));
1412 }
1413
1414 #[test]
1415 fn test_cli_parse_input() {
1416 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1417 assert!(matches!(
1418 &cli.command,
1419 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1420 ));
1421 }
1422
1423 #[test]
1424 fn test_cli_parse_input_no_text() {
1425 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1426 assert!(matches!(
1427 &cli.command,
1428 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1429 ));
1430 }
1431
1432 #[test]
1433 fn test_cli_parse_input_alias() {
1434 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1435 assert!(matches!(
1436 &cli.command,
1437 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1438 ));
1439 }
1440
1441 #[test]
1442 fn test_cli_parse_custom_node() {
1443 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1444 assert_eq!(cli.node, "win-pc:8080");
1445 assert_eq!(cli.path.as_deref(), Some("list"));
1447 }
1448
1449 #[test]
1450 fn test_cli_version() {
1451 let result = Cli::try_parse_from(["pulpo", "--version"]);
1452 let err = result.unwrap_err();
1454 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1455 }
1456
1457 #[test]
1458 fn test_cli_parse_no_subcommand_succeeds() {
1459 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1460 assert!(cli.command.is_none());
1461 assert!(cli.path.is_none());
1462 }
1463
1464 #[test]
1465 fn test_cli_debug() {
1466 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1467 let debug = format!("{cli:?}");
1468 assert!(debug.contains("List"));
1469 }
1470
1471 #[test]
1472 fn test_commands_debug() {
1473 let cmd = Commands::List;
1474 assert_eq!(format!("{cmd:?}"), "List");
1475 }
1476
1477 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1479
1480 fn test_create_response_json() -> String {
1482 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1483 }
1484
1485 async fn start_test_server() -> String {
1487 use axum::http::StatusCode;
1488 use axum::{
1489 Json, Router,
1490 routing::{get, post},
1491 };
1492
1493 let create_json = test_create_response_json();
1494
1495 let app = Router::new()
1496 .route(
1497 "/api/v1/sessions",
1498 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1499 (StatusCode::CREATED, create_json.clone())
1500 }),
1501 )
1502 .route(
1503 "/api/v1/sessions/{id}",
1504 get(|| async { TEST_SESSION_JSON.to_owned() })
1505 .delete(|| async { StatusCode::NO_CONTENT }),
1506 )
1507 .route(
1508 "/api/v1/sessions/{id}/kill",
1509 post(|| async { StatusCode::NO_CONTENT }),
1510 )
1511 .route(
1512 "/api/v1/sessions/{id}/output",
1513 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1514 )
1515 .route(
1516 "/api/v1/peers",
1517 get(|| async {
1518 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1519 }),
1520 )
1521 .route(
1522 "/api/v1/sessions/{id}/resume",
1523 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1524 )
1525 .route(
1526 "/api/v1/sessions/{id}/interventions",
1527 get(|| async { "[]".to_owned() }),
1528 )
1529 .route(
1530 "/api/v1/sessions/{id}/input",
1531 post(|| async { StatusCode::NO_CONTENT }),
1532 )
1533 .route(
1534 "/api/v1/schedules",
1535 get(|| async { Json::<Vec<()>>(vec![]) })
1536 .post(|| async { StatusCode::CREATED }),
1537 )
1538 .route(
1539 "/api/v1/schedules/{id}",
1540 axum::routing::put(|| async { StatusCode::OK })
1541 .delete(|| async { StatusCode::NO_CONTENT }),
1542 );
1543
1544 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1545 let addr = listener.local_addr().unwrap();
1546 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1547 format!("127.0.0.1:{}", addr.port())
1548 }
1549
1550 #[tokio::test]
1551 async fn test_execute_list_success() {
1552 let node = start_test_server().await;
1553 let cli = Cli {
1554 node,
1555 token: None,
1556 command: Some(Commands::List),
1557 path: None,
1558 };
1559 let result = execute(&cli).await.unwrap();
1560 assert_eq!(result, "No sessions.");
1561 }
1562
1563 #[tokio::test]
1564 async fn test_execute_nodes_success() {
1565 let node = start_test_server().await;
1566 let cli = Cli {
1567 node,
1568 token: None,
1569 command: Some(Commands::Nodes),
1570 path: None,
1571 };
1572 let result = execute(&cli).await.unwrap();
1573 assert!(result.contains("test"));
1574 assert!(result.contains("(local)"));
1575 assert!(result.contains("NAME"));
1576 }
1577
1578 #[tokio::test]
1579 async fn test_execute_spawn_success() {
1580 let node = start_test_server().await;
1581 let cli = Cli {
1582 node,
1583 token: None,
1584 command: Some(Commands::Spawn {
1585 name: Some("test".into()),
1586 workdir: Some("/tmp/repo".into()),
1587 ink: None,
1588 description: None,
1589 detach: true,
1590 idle_threshold: None,
1591 auto: false,
1592 worktree: false,
1593 sandbox: false,
1594 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1595 }),
1596 path: None,
1597 };
1598 let result = execute(&cli).await.unwrap();
1599 assert!(result.contains("Created session"));
1600 assert!(result.contains("repo"));
1601 }
1602
1603 #[tokio::test]
1604 async fn test_execute_spawn_with_all_flags() {
1605 let node = start_test_server().await;
1606 let cli = Cli {
1607 node,
1608 token: None,
1609 command: Some(Commands::Spawn {
1610 name: Some("test".into()),
1611 workdir: Some("/tmp/repo".into()),
1612 ink: Some("coder".into()),
1613 description: Some("Fix the bug".into()),
1614 detach: true,
1615 idle_threshold: None,
1616 auto: false,
1617 worktree: false,
1618 sandbox: false,
1619 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1620 }),
1621 path: None,
1622 };
1623 let result = execute(&cli).await.unwrap();
1624 assert!(result.contains("Created session"));
1625 }
1626
1627 #[tokio::test]
1628 async fn test_execute_spawn_no_command() {
1629 let node = start_test_server().await;
1630 let cli = Cli {
1631 node,
1632 token: None,
1633 command: Some(Commands::Spawn {
1634 name: Some("test".into()),
1635 workdir: Some("/tmp/repo".into()),
1636 ink: None,
1637 description: None,
1638 detach: true,
1639 idle_threshold: None,
1640 auto: false,
1641 worktree: false,
1642 sandbox: false,
1643 command: vec![],
1644 }),
1645 path: None,
1646 };
1647 let result = execute(&cli).await.unwrap();
1648 assert!(result.contains("Created session"));
1649 }
1650
1651 #[tokio::test]
1652 async fn test_execute_spawn_with_name() {
1653 let node = start_test_server().await;
1654 let cli = Cli {
1655 node,
1656 token: None,
1657 command: Some(Commands::Spawn {
1658 name: Some("my-task".into()),
1659 workdir: Some("/tmp/repo".into()),
1660 ink: None,
1661 description: None,
1662 detach: true,
1663 idle_threshold: None,
1664 auto: false,
1665 worktree: false,
1666 sandbox: false,
1667 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1668 }),
1669 path: None,
1670 };
1671 let result = execute(&cli).await.unwrap();
1672 assert!(result.contains("Created session"));
1673 }
1674
1675 #[tokio::test]
1676 async fn test_execute_spawn_auto_attach() {
1677 let node = start_test_server().await;
1678 let cli = Cli {
1679 node,
1680 token: None,
1681 command: Some(Commands::Spawn {
1682 name: Some("test".into()),
1683 workdir: Some("/tmp/repo".into()),
1684 ink: None,
1685 description: None,
1686 detach: false,
1687 idle_threshold: None,
1688 auto: false,
1689 worktree: false,
1690 sandbox: false,
1691 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1692 }),
1693 path: None,
1694 };
1695 let result = execute(&cli).await.unwrap();
1696 assert!(result.contains("Detached from session"));
1698 }
1699
1700 #[tokio::test]
1701 async fn test_execute_kill_success() {
1702 let node = start_test_server().await;
1703 let cli = Cli {
1704 node,
1705 token: None,
1706 command: Some(Commands::Kill {
1707 name: "test-session".into(),
1708 }),
1709 path: None,
1710 };
1711 let result = execute(&cli).await.unwrap();
1712 assert!(result.contains("killed"));
1713 }
1714
1715 #[tokio::test]
1716 async fn test_execute_delete_success() {
1717 let node = start_test_server().await;
1718 let cli = Cli {
1719 node,
1720 token: None,
1721 command: Some(Commands::Delete {
1722 name: "test-session".into(),
1723 }),
1724 path: None,
1725 };
1726 let result = execute(&cli).await.unwrap();
1727 assert!(result.contains("deleted"));
1728 }
1729
1730 #[tokio::test]
1731 async fn test_execute_logs_success() {
1732 let node = start_test_server().await;
1733 let cli = Cli {
1734 node,
1735 token: None,
1736 command: Some(Commands::Logs {
1737 name: "test-session".into(),
1738 lines: 50,
1739 follow: false,
1740 }),
1741 path: None,
1742 };
1743 let result = execute(&cli).await.unwrap();
1744 assert!(result.contains("test output"));
1745 }
1746
1747 #[tokio::test]
1748 async fn test_execute_list_connection_refused() {
1749 let cli = Cli {
1750 node: "localhost:1".into(),
1751 token: None,
1752 command: Some(Commands::List),
1753 path: None,
1754 };
1755 let result = execute(&cli).await;
1756 let err = result.unwrap_err().to_string();
1757 assert!(
1758 err.contains("Could not connect to pulpod"),
1759 "Expected friendly error, got: {err}"
1760 );
1761 assert!(err.contains("localhost:1"));
1762 }
1763
1764 #[tokio::test]
1765 async fn test_execute_nodes_connection_refused() {
1766 let cli = Cli {
1767 node: "localhost:1".into(),
1768 token: None,
1769 command: Some(Commands::Nodes),
1770 path: None,
1771 };
1772 let result = execute(&cli).await;
1773 let err = result.unwrap_err().to_string();
1774 assert!(err.contains("Could not connect to pulpod"));
1775 }
1776
1777 #[tokio::test]
1778 async fn test_execute_kill_error_response() {
1779 use axum::{Router, http::StatusCode, routing::post};
1780
1781 let app = Router::new().route(
1782 "/api/v1/sessions/{id}/kill",
1783 post(|| async {
1784 (
1785 StatusCode::NOT_FOUND,
1786 "{\"error\":\"session not found: test-session\"}",
1787 )
1788 }),
1789 );
1790 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1791 let addr = listener.local_addr().unwrap();
1792 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1793 let node = format!("127.0.0.1:{}", addr.port());
1794
1795 let cli = Cli {
1796 node,
1797 token: None,
1798 command: Some(Commands::Kill {
1799 name: "test-session".into(),
1800 }),
1801 path: None,
1802 };
1803 let err = execute(&cli).await.unwrap_err();
1804 assert_eq!(err.to_string(), "session not found: test-session");
1805 }
1806
1807 #[tokio::test]
1808 async fn test_execute_delete_error_response() {
1809 use axum::{Router, http::StatusCode, routing::delete};
1810
1811 let app = Router::new().route(
1812 "/api/v1/sessions/{id}",
1813 delete(|| async {
1814 (
1815 StatusCode::CONFLICT,
1816 "{\"error\":\"cannot delete session in 'running' state\"}",
1817 )
1818 }),
1819 );
1820 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1821 let addr = listener.local_addr().unwrap();
1822 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1823 let node = format!("127.0.0.1:{}", addr.port());
1824
1825 let cli = Cli {
1826 node,
1827 token: None,
1828 command: Some(Commands::Delete {
1829 name: "test-session".into(),
1830 }),
1831 path: None,
1832 };
1833 let err = execute(&cli).await.unwrap_err();
1834 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1835 }
1836
1837 #[tokio::test]
1838 async fn test_execute_logs_error_response() {
1839 use axum::{Router, http::StatusCode, routing::get};
1840
1841 let app = Router::new().route(
1842 "/api/v1/sessions/{id}/output",
1843 get(|| async {
1844 (
1845 StatusCode::NOT_FOUND,
1846 "{\"error\":\"session not found: ghost\"}",
1847 )
1848 }),
1849 );
1850 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1851 let addr = listener.local_addr().unwrap();
1852 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1853 let node = format!("127.0.0.1:{}", addr.port());
1854
1855 let cli = Cli {
1856 node,
1857 token: None,
1858 command: Some(Commands::Logs {
1859 name: "ghost".into(),
1860 lines: 50,
1861 follow: false,
1862 }),
1863 path: None,
1864 };
1865 let err = execute(&cli).await.unwrap_err();
1866 assert_eq!(err.to_string(), "session not found: ghost");
1867 }
1868
1869 #[tokio::test]
1870 async fn test_execute_resume_error_response() {
1871 use axum::{Router, http::StatusCode, routing::post};
1872
1873 let app = Router::new().route(
1874 "/api/v1/sessions/{id}/resume",
1875 post(|| async {
1876 (
1877 StatusCode::BAD_REQUEST,
1878 "{\"error\":\"session is not lost (status: active)\"}",
1879 )
1880 }),
1881 );
1882 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1883 let addr = listener.local_addr().unwrap();
1884 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1885 let node = format!("127.0.0.1:{}", addr.port());
1886
1887 let cli = Cli {
1888 node,
1889 token: None,
1890 command: Some(Commands::Resume {
1891 name: "test-session".into(),
1892 }),
1893 path: None,
1894 };
1895 let err = execute(&cli).await.unwrap_err();
1896 assert_eq!(err.to_string(), "session is not lost (status: active)");
1897 }
1898
1899 #[tokio::test]
1900 async fn test_execute_spawn_error_response() {
1901 use axum::{Router, http::StatusCode, routing::post};
1902
1903 let app = Router::new().route(
1904 "/api/v1/sessions",
1905 post(|| async {
1906 (
1907 StatusCode::INTERNAL_SERVER_ERROR,
1908 "{\"error\":\"failed to spawn session\"}",
1909 )
1910 }),
1911 );
1912 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1913 let addr = listener.local_addr().unwrap();
1914 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1915 let node = format!("127.0.0.1:{}", addr.port());
1916
1917 let cli = Cli {
1918 node,
1919 token: None,
1920 command: Some(Commands::Spawn {
1921 name: Some("test".into()),
1922 workdir: Some("/tmp/repo".into()),
1923 ink: None,
1924 description: None,
1925 detach: true,
1926 idle_threshold: None,
1927 auto: false,
1928 worktree: false,
1929 sandbox: false,
1930 command: vec!["test".into()],
1931 }),
1932 path: None,
1933 };
1934 let err = execute(&cli).await.unwrap_err();
1935 assert_eq!(err.to_string(), "failed to spawn session");
1936 }
1937
1938 #[tokio::test]
1939 async fn test_execute_interventions_error_response() {
1940 use axum::{Router, http::StatusCode, routing::get};
1941
1942 let app = Router::new().route(
1943 "/api/v1/sessions/{id}/interventions",
1944 get(|| async {
1945 (
1946 StatusCode::NOT_FOUND,
1947 "{\"error\":\"session not found: ghost\"}",
1948 )
1949 }),
1950 );
1951 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1952 let addr = listener.local_addr().unwrap();
1953 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1954 let node = format!("127.0.0.1:{}", addr.port());
1955
1956 let cli = Cli {
1957 node,
1958 token: None,
1959 command: Some(Commands::Interventions {
1960 name: "ghost".into(),
1961 }),
1962 path: None,
1963 };
1964 let err = execute(&cli).await.unwrap_err();
1965 assert_eq!(err.to_string(), "session not found: ghost");
1966 }
1967
1968 #[tokio::test]
1969 async fn test_execute_resume_success() {
1970 let node = start_test_server().await;
1971 let cli = Cli {
1972 node,
1973 token: None,
1974 command: Some(Commands::Resume {
1975 name: "test-session".into(),
1976 }),
1977 path: None,
1978 };
1979 let result = execute(&cli).await.unwrap();
1980 assert!(result.contains("Detached from session"));
1981 }
1982
1983 #[tokio::test]
1984 async fn test_execute_input_success() {
1985 let node = start_test_server().await;
1986 let cli = Cli {
1987 node,
1988 token: None,
1989 command: Some(Commands::Input {
1990 name: "test-session".into(),
1991 text: Some("yes".into()),
1992 }),
1993 path: None,
1994 };
1995 let result = execute(&cli).await.unwrap();
1996 assert!(result.contains("Sent input to session test-session"));
1997 }
1998
1999 #[tokio::test]
2000 async fn test_execute_input_no_text() {
2001 let node = start_test_server().await;
2002 let cli = Cli {
2003 node,
2004 token: None,
2005 command: Some(Commands::Input {
2006 name: "test-session".into(),
2007 text: None,
2008 }),
2009 path: None,
2010 };
2011 let result = execute(&cli).await.unwrap();
2012 assert!(result.contains("Sent input to session test-session"));
2013 }
2014
2015 #[tokio::test]
2016 async fn test_execute_input_connection_refused() {
2017 let cli = Cli {
2018 node: "localhost:1".into(),
2019 token: None,
2020 command: Some(Commands::Input {
2021 name: "test".into(),
2022 text: Some("y".into()),
2023 }),
2024 path: None,
2025 };
2026 let result = execute(&cli).await;
2027 let err = result.unwrap_err().to_string();
2028 assert!(err.contains("Could not connect to pulpod"));
2029 }
2030
2031 #[tokio::test]
2032 async fn test_execute_input_error_response() {
2033 use axum::{Router, http::StatusCode, routing::post};
2034
2035 let app = Router::new().route(
2036 "/api/v1/sessions/{id}/input",
2037 post(|| async {
2038 (
2039 StatusCode::NOT_FOUND,
2040 "{\"error\":\"session not found: ghost\"}",
2041 )
2042 }),
2043 );
2044 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2045 let addr = listener.local_addr().unwrap();
2046 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2047 let node = format!("127.0.0.1:{}", addr.port());
2048
2049 let cli = Cli {
2050 node,
2051 token: None,
2052 command: Some(Commands::Input {
2053 name: "ghost".into(),
2054 text: Some("y".into()),
2055 }),
2056 path: None,
2057 };
2058 let err = execute(&cli).await.unwrap_err();
2059 assert_eq!(err.to_string(), "session not found: ghost");
2060 }
2061
2062 #[tokio::test]
2063 async fn test_execute_ui() {
2064 let cli = Cli {
2065 node: "localhost:7433".into(),
2066 token: None,
2067 command: Some(Commands::Ui),
2068 path: None,
2069 };
2070 let result = execute(&cli).await.unwrap();
2071 assert!(result.contains("Opening"));
2072 assert!(result.contains("http://localhost:7433"));
2073 }
2074
2075 #[tokio::test]
2076 async fn test_execute_ui_custom_node() {
2077 let cli = Cli {
2078 node: "mac-mini:7433".into(),
2079 token: None,
2080 command: Some(Commands::Ui),
2081 path: None,
2082 };
2083 let result = execute(&cli).await.unwrap();
2084 assert!(result.contains("http://mac-mini:7433"));
2085 }
2086
2087 #[test]
2088 fn test_format_sessions_empty() {
2089 assert_eq!(format_sessions(&[]), "No sessions.");
2090 }
2091
2092 #[test]
2093 fn test_format_sessions_with_data() {
2094 use chrono::Utc;
2095 use pulpo_common::session::SessionStatus;
2096 use uuid::Uuid;
2097
2098 let sessions = vec![Session {
2099 id: Uuid::nil(),
2100 name: "my-api".into(),
2101 workdir: "/tmp/repo".into(),
2102 command: "claude -p 'Fix the bug'".into(),
2103 description: Some("Fix the bug".into()),
2104 status: SessionStatus::Active,
2105 exit_code: None,
2106 backend_session_id: None,
2107 output_snapshot: None,
2108 metadata: None,
2109 ink: None,
2110 intervention_code: None,
2111 intervention_reason: None,
2112 intervention_at: None,
2113 last_output_at: None,
2114 idle_since: None,
2115 idle_threshold_secs: None,
2116 worktree_path: None,
2117 sandbox: false,
2118 created_at: Utc::now(),
2119 updated_at: Utc::now(),
2120 }];
2121 let output = format_sessions(&sessions);
2122 assert!(output.contains("NAME"));
2123 assert!(output.contains("COMMAND"));
2124 assert!(output.contains("my-api"));
2125 assert!(output.contains("active"));
2126 assert!(output.contains("claude -p 'Fix the bug'"));
2127 }
2128
2129 #[test]
2130 fn test_format_sessions_long_command_truncated() {
2131 use chrono::Utc;
2132 use pulpo_common::session::SessionStatus;
2133 use uuid::Uuid;
2134
2135 let sessions = vec![Session {
2136 id: Uuid::nil(),
2137 name: "test".into(),
2138 workdir: "/tmp".into(),
2139 command:
2140 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2141 .into(),
2142 description: None,
2143 status: SessionStatus::Ready,
2144 exit_code: None,
2145 backend_session_id: None,
2146 output_snapshot: None,
2147 metadata: None,
2148 ink: None,
2149 intervention_code: None,
2150 intervention_reason: None,
2151 intervention_at: None,
2152 last_output_at: None,
2153 idle_since: None,
2154 idle_threshold_secs: None,
2155 worktree_path: None,
2156 sandbox: false,
2157 created_at: Utc::now(),
2158 updated_at: Utc::now(),
2159 }];
2160 let output = format_sessions(&sessions);
2161 assert!(output.contains("..."));
2162 }
2163
2164 #[test]
2165 fn test_format_nodes() {
2166 use pulpo_common::node::NodeInfo;
2167 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2168
2169 let resp = PeersResponse {
2170 local: NodeInfo {
2171 name: "mac-mini".into(),
2172 hostname: "h".into(),
2173 os: "macos".into(),
2174 arch: "arm64".into(),
2175 cpus: 8,
2176 memory_mb: 16384,
2177 gpu: None,
2178 },
2179 peers: vec![PeerInfo {
2180 name: "win-pc".into(),
2181 address: "win-pc:7433".into(),
2182 status: PeerStatus::Online,
2183 node_info: None,
2184 session_count: Some(3),
2185 source: PeerSource::Configured,
2186 }],
2187 };
2188 let output = format_nodes(&resp);
2189 assert!(output.contains("mac-mini"));
2190 assert!(output.contains("(local)"));
2191 assert!(output.contains("win-pc"));
2192 assert!(output.contains('3'));
2193 }
2194
2195 #[test]
2196 fn test_format_nodes_no_session_count() {
2197 use pulpo_common::node::NodeInfo;
2198 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2199
2200 let resp = PeersResponse {
2201 local: NodeInfo {
2202 name: "local".into(),
2203 hostname: "h".into(),
2204 os: "linux".into(),
2205 arch: "x86_64".into(),
2206 cpus: 4,
2207 memory_mb: 8192,
2208 gpu: None,
2209 },
2210 peers: vec![PeerInfo {
2211 name: "peer".into(),
2212 address: "peer:7433".into(),
2213 status: PeerStatus::Offline,
2214 node_info: None,
2215 session_count: None,
2216 source: PeerSource::Configured,
2217 }],
2218 };
2219 let output = format_nodes(&resp);
2220 assert!(output.contains("offline"));
2221 let lines: Vec<&str> = output.lines().collect();
2223 assert!(lines[2].contains('-'));
2224 }
2225
2226 #[tokio::test]
2227 async fn test_execute_resume_connection_refused() {
2228 let cli = Cli {
2229 node: "localhost:1".into(),
2230 token: None,
2231 command: Some(Commands::Resume {
2232 name: "test".into(),
2233 }),
2234 path: None,
2235 };
2236 let result = execute(&cli).await;
2237 let err = result.unwrap_err().to_string();
2238 assert!(err.contains("Could not connect to pulpod"));
2239 }
2240
2241 #[tokio::test]
2242 async fn test_execute_spawn_connection_refused() {
2243 let cli = Cli {
2244 node: "localhost:1".into(),
2245 token: None,
2246 command: Some(Commands::Spawn {
2247 name: Some("test".into()),
2248 workdir: Some("/tmp".into()),
2249 ink: None,
2250 description: None,
2251 detach: true,
2252 idle_threshold: None,
2253 auto: false,
2254 worktree: false,
2255 sandbox: false,
2256 command: vec!["test".into()],
2257 }),
2258 path: None,
2259 };
2260 let result = execute(&cli).await;
2261 let err = result.unwrap_err().to_string();
2262 assert!(err.contains("Could not connect to pulpod"));
2263 }
2264
2265 #[tokio::test]
2266 async fn test_execute_kill_connection_refused() {
2267 let cli = Cli {
2268 node: "localhost:1".into(),
2269 token: None,
2270 command: Some(Commands::Kill {
2271 name: "test".into(),
2272 }),
2273 path: None,
2274 };
2275 let result = execute(&cli).await;
2276 let err = result.unwrap_err().to_string();
2277 assert!(err.contains("Could not connect to pulpod"));
2278 }
2279
2280 #[tokio::test]
2281 async fn test_execute_delete_connection_refused() {
2282 let cli = Cli {
2283 node: "localhost:1".into(),
2284 token: None,
2285 command: Some(Commands::Delete {
2286 name: "test".into(),
2287 }),
2288 path: None,
2289 };
2290 let result = execute(&cli).await;
2291 let err = result.unwrap_err().to_string();
2292 assert!(err.contains("Could not connect to pulpod"));
2293 }
2294
2295 #[tokio::test]
2296 async fn test_execute_logs_connection_refused() {
2297 let cli = Cli {
2298 node: "localhost:1".into(),
2299 token: None,
2300 command: Some(Commands::Logs {
2301 name: "test".into(),
2302 lines: 50,
2303 follow: false,
2304 }),
2305 path: None,
2306 };
2307 let result = execute(&cli).await;
2308 let err = result.unwrap_err().to_string();
2309 assert!(err.contains("Could not connect to pulpod"));
2310 }
2311
2312 #[tokio::test]
2313 async fn test_friendly_error_connect() {
2314 let err = reqwest::Client::new()
2316 .get("http://127.0.0.1:1")
2317 .send()
2318 .await
2319 .unwrap_err();
2320 let friendly = friendly_error(&err, "test-node:1");
2321 let msg = friendly.to_string();
2322 assert!(
2323 msg.contains("Could not connect"),
2324 "Expected connect message, got: {msg}"
2325 );
2326 }
2327
2328 #[tokio::test]
2329 async fn test_friendly_error_other() {
2330 let err = reqwest::Client::new()
2332 .get("http://[::invalid::url")
2333 .send()
2334 .await
2335 .unwrap_err();
2336 let friendly = friendly_error(&err, "bad-host");
2337 let msg = friendly.to_string();
2338 assert!(
2339 msg.contains("Network error"),
2340 "Expected network error message, got: {msg}"
2341 );
2342 assert!(msg.contains("bad-host"));
2343 }
2344
2345 #[test]
2348 fn test_is_localhost_variants() {
2349 assert!(is_localhost("localhost:7433"));
2350 assert!(is_localhost("127.0.0.1:7433"));
2351 assert!(is_localhost("[::1]:7433"));
2352 assert!(is_localhost("::1"));
2353 assert!(is_localhost("localhost"));
2354 assert!(!is_localhost("mac-mini:7433"));
2355 assert!(!is_localhost("192.168.1.100:7433"));
2356 }
2357
2358 #[test]
2359 fn test_authed_get_with_token() {
2360 let client = reqwest::Client::new();
2361 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2362 .build()
2363 .unwrap();
2364 let auth = req
2365 .headers()
2366 .get("authorization")
2367 .unwrap()
2368 .to_str()
2369 .unwrap();
2370 assert_eq!(auth, "Bearer tok");
2371 }
2372
2373 #[test]
2374 fn test_authed_get_without_token() {
2375 let client = reqwest::Client::new();
2376 let req = authed_get(&client, "http://h:1/api".into(), None)
2377 .build()
2378 .unwrap();
2379 assert!(req.headers().get("authorization").is_none());
2380 }
2381
2382 #[test]
2383 fn test_authed_post_with_token() {
2384 let client = reqwest::Client::new();
2385 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2386 .build()
2387 .unwrap();
2388 let auth = req
2389 .headers()
2390 .get("authorization")
2391 .unwrap()
2392 .to_str()
2393 .unwrap();
2394 assert_eq!(auth, "Bearer secret");
2395 }
2396
2397 #[test]
2398 fn test_authed_post_without_token() {
2399 let client = reqwest::Client::new();
2400 let req = authed_post(&client, "http://h:1/api".into(), None)
2401 .build()
2402 .unwrap();
2403 assert!(req.headers().get("authorization").is_none());
2404 }
2405
2406 #[test]
2407 fn test_authed_delete_with_token() {
2408 let client = reqwest::Client::new();
2409 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2410 .build()
2411 .unwrap();
2412 let auth = req
2413 .headers()
2414 .get("authorization")
2415 .unwrap()
2416 .to_str()
2417 .unwrap();
2418 assert_eq!(auth, "Bearer del-tok");
2419 }
2420
2421 #[test]
2422 fn test_authed_delete_without_token() {
2423 let client = reqwest::Client::new();
2424 let req = authed_delete(&client, "http://h:1/api".into(), None)
2425 .build()
2426 .unwrap();
2427 assert!(req.headers().get("authorization").is_none());
2428 }
2429
2430 #[tokio::test]
2431 async fn test_resolve_token_explicit() {
2432 let client = reqwest::Client::new();
2433 let token =
2434 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2435 assert_eq!(token, Some("my-tok".into()));
2436 }
2437
2438 #[tokio::test]
2439 async fn test_resolve_token_remote_no_explicit() {
2440 let client = reqwest::Client::new();
2441 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2442 assert_eq!(token, None);
2443 }
2444
2445 #[tokio::test]
2446 async fn test_resolve_token_localhost_auto_discover() {
2447 use axum::{Json, Router, routing::get};
2448
2449 let app = Router::new().route(
2450 "/api/v1/auth/token",
2451 get(|| async {
2452 Json(AuthTokenResponse {
2453 token: "discovered".into(),
2454 })
2455 }),
2456 );
2457 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2458 let addr = listener.local_addr().unwrap();
2459 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2460
2461 let node = format!("localhost:{}", addr.port());
2462 let base = base_url(&node);
2463 let client = reqwest::Client::new();
2464 let token = resolve_token(&client, &base, &node, None).await;
2465 assert_eq!(token, Some("discovered".into()));
2466 }
2467
2468 #[tokio::test]
2469 async fn test_discover_token_empty_returns_none() {
2470 use axum::{Json, Router, routing::get};
2471
2472 let app = Router::new().route(
2473 "/api/v1/auth/token",
2474 get(|| async {
2475 Json(AuthTokenResponse {
2476 token: String::new(),
2477 })
2478 }),
2479 );
2480 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2481 let addr = listener.local_addr().unwrap();
2482 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2483
2484 let base = format!("http://127.0.0.1:{}", addr.port());
2485 let client = reqwest::Client::new();
2486 assert_eq!(discover_token(&client, &base).await, None);
2487 }
2488
2489 #[tokio::test]
2490 async fn test_discover_token_unreachable_returns_none() {
2491 let client = reqwest::Client::new();
2492 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2493 }
2494
2495 #[test]
2496 fn test_cli_parse_with_token() {
2497 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2498 assert_eq!(cli.token, Some("my-secret".into()));
2499 }
2500
2501 #[test]
2502 fn test_cli_parse_without_token() {
2503 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2504 assert_eq!(cli.token, None);
2505 }
2506
2507 #[tokio::test]
2508 async fn test_execute_with_explicit_token_sends_header() {
2509 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2510
2511 let app = Router::new().route(
2512 "/api/v1/sessions",
2513 get(|req: Request| async move {
2514 let auth = req
2515 .headers()
2516 .get("authorization")
2517 .and_then(|v| v.to_str().ok())
2518 .unwrap_or("");
2519 assert_eq!(auth, "Bearer test-token");
2520 (StatusCode::OK, "[]".to_owned())
2521 }),
2522 );
2523 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2524 let addr = listener.local_addr().unwrap();
2525 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2526 let node = format!("127.0.0.1:{}", addr.port());
2527
2528 let cli = Cli {
2529 node,
2530 token: Some("test-token".into()),
2531 command: Some(Commands::List),
2532 path: None,
2533 };
2534 let result = execute(&cli).await.unwrap();
2535 assert_eq!(result, "No sessions.");
2536 }
2537
2538 #[test]
2541 fn test_cli_parse_interventions() {
2542 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2543 assert!(matches!(
2544 &cli.command,
2545 Some(Commands::Interventions { name }) if name == "my-session"
2546 ));
2547 }
2548
2549 #[test]
2550 fn test_format_interventions_empty() {
2551 assert_eq!(format_interventions(&[]), "No intervention events.");
2552 }
2553
2554 #[test]
2555 fn test_format_interventions_with_data() {
2556 let events = vec![
2557 InterventionEventResponse {
2558 id: 1,
2559 session_id: "sess-1".into(),
2560 code: None,
2561 reason: "Memory exceeded threshold".into(),
2562 created_at: "2026-01-01T00:00:00Z".into(),
2563 },
2564 InterventionEventResponse {
2565 id: 2,
2566 session_id: "sess-1".into(),
2567 code: None,
2568 reason: "Idle for 10 minutes".into(),
2569 created_at: "2026-01-02T00:00:00Z".into(),
2570 },
2571 ];
2572 let output = format_interventions(&events);
2573 assert!(output.contains("ID"));
2574 assert!(output.contains("TIMESTAMP"));
2575 assert!(output.contains("REASON"));
2576 assert!(output.contains("Memory exceeded threshold"));
2577 assert!(output.contains("Idle for 10 minutes"));
2578 assert!(output.contains("2026-01-01T00:00:00Z"));
2579 }
2580
2581 #[tokio::test]
2582 async fn test_execute_interventions_empty() {
2583 let node = start_test_server().await;
2584 let cli = Cli {
2585 node,
2586 token: None,
2587 command: Some(Commands::Interventions {
2588 name: "my-session".into(),
2589 }),
2590 path: None,
2591 };
2592 let result = execute(&cli).await.unwrap();
2593 assert_eq!(result, "No intervention events.");
2594 }
2595
2596 #[tokio::test]
2597 async fn test_execute_interventions_with_data() {
2598 use axum::{Router, routing::get};
2599
2600 let app = Router::new().route(
2601 "/api/v1/sessions/{id}/interventions",
2602 get(|| async {
2603 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2604 .to_owned()
2605 }),
2606 );
2607 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2608 let addr = listener.local_addr().unwrap();
2609 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2610 let node = format!("127.0.0.1:{}", addr.port());
2611
2612 let cli = Cli {
2613 node,
2614 token: None,
2615 command: Some(Commands::Interventions {
2616 name: "test".into(),
2617 }),
2618 path: None,
2619 };
2620 let result = execute(&cli).await.unwrap();
2621 assert!(result.contains("OOM"));
2622 assert!(result.contains("2026-01-01T00:00:00Z"));
2623 }
2624
2625 #[tokio::test]
2626 async fn test_execute_interventions_connection_refused() {
2627 let cli = Cli {
2628 node: "localhost:1".into(),
2629 token: None,
2630 command: Some(Commands::Interventions {
2631 name: "test".into(),
2632 }),
2633 path: None,
2634 };
2635 let result = execute(&cli).await;
2636 let err = result.unwrap_err().to_string();
2637 assert!(err.contains("Could not connect to pulpod"));
2638 }
2639
2640 #[test]
2643 fn test_build_attach_command() {
2644 let cmd = build_attach_command("my-session");
2645 assert_eq!(cmd.get_program(), "tmux");
2646 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2647 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2648 }
2649
2650 #[test]
2651 fn test_cli_parse_attach() {
2652 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2653 assert!(matches!(
2654 &cli.command,
2655 Some(Commands::Attach { name }) if name == "my-session"
2656 ));
2657 }
2658
2659 #[test]
2660 fn test_cli_parse_attach_alias() {
2661 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2662 assert!(matches!(
2663 &cli.command,
2664 Some(Commands::Attach { name }) if name == "my-session"
2665 ));
2666 }
2667
2668 #[tokio::test]
2669 async fn test_execute_attach_success() {
2670 let node = start_test_server().await;
2671 let cli = Cli {
2672 node,
2673 token: None,
2674 command: Some(Commands::Attach {
2675 name: "test-session".into(),
2676 }),
2677 path: None,
2678 };
2679 let result = execute(&cli).await.unwrap();
2680 assert!(result.contains("Detached from session test-session"));
2681 }
2682
2683 #[tokio::test]
2684 async fn test_execute_attach_with_backend_session_id() {
2685 use axum::{Router, routing::get};
2686 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2687 let app = Router::new().route(
2688 "/api/v1/sessions/{id}",
2689 get(move || async move { session_json.to_owned() }),
2690 );
2691 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2692 let addr = listener.local_addr().unwrap();
2693 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2694
2695 let cli = Cli {
2696 node: format!("127.0.0.1:{}", addr.port()),
2697 token: None,
2698 command: Some(Commands::Attach {
2699 name: "my-session".into(),
2700 }),
2701 path: None,
2702 };
2703 let result = execute(&cli).await.unwrap();
2704 assert!(result.contains("Detached from session my-session"));
2705 }
2706
2707 #[tokio::test]
2708 async fn test_execute_attach_connection_refused() {
2709 let cli = Cli {
2710 node: "localhost:1".into(),
2711 token: None,
2712 command: Some(Commands::Attach {
2713 name: "test-session".into(),
2714 }),
2715 path: None,
2716 };
2717 let result = execute(&cli).await;
2718 let err = result.unwrap_err().to_string();
2719 assert!(err.contains("Could not connect to pulpod"));
2720 }
2721
2722 #[tokio::test]
2723 async fn test_execute_attach_error_response() {
2724 use axum::{Router, http::StatusCode, routing::get};
2725 let app = Router::new().route(
2726 "/api/v1/sessions/{id}",
2727 get(|| async {
2728 (
2729 StatusCode::NOT_FOUND,
2730 r#"{"error":"session not found"}"#.to_owned(),
2731 )
2732 }),
2733 );
2734 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2735 let addr = listener.local_addr().unwrap();
2736 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2737
2738 let cli = Cli {
2739 node: format!("127.0.0.1:{}", addr.port()),
2740 token: None,
2741 command: Some(Commands::Attach {
2742 name: "nonexistent".into(),
2743 }),
2744 path: None,
2745 };
2746 let result = execute(&cli).await;
2747 let err = result.unwrap_err().to_string();
2748 assert!(err.contains("session not found"));
2749 }
2750
2751 #[tokio::test]
2752 async fn test_execute_attach_stale_session() {
2753 use axum::{Router, routing::get};
2754 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2755 let app = Router::new().route(
2756 "/api/v1/sessions/{id}",
2757 get(move || async move { session_json.to_owned() }),
2758 );
2759 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2760 let addr = listener.local_addr().unwrap();
2761 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2762
2763 let cli = Cli {
2764 node: format!("127.0.0.1:{}", addr.port()),
2765 token: None,
2766 command: Some(Commands::Attach {
2767 name: "stale-sess".into(),
2768 }),
2769 path: None,
2770 };
2771 let result = execute(&cli).await;
2772 let err = result.unwrap_err().to_string();
2773 assert!(err.contains("lost"));
2774 assert!(err.contains("pulpo resume"));
2775 }
2776
2777 #[tokio::test]
2778 async fn test_execute_attach_dead_session() {
2779 use axum::{Router, routing::get};
2780 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2781 let app = Router::new().route(
2782 "/api/v1/sessions/{id}",
2783 get(move || async move { session_json.to_owned() }),
2784 );
2785 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2786 let addr = listener.local_addr().unwrap();
2787 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2788
2789 let cli = Cli {
2790 node: format!("127.0.0.1:{}", addr.port()),
2791 token: None,
2792 command: Some(Commands::Attach {
2793 name: "dead-sess".into(),
2794 }),
2795 path: None,
2796 };
2797 let result = execute(&cli).await;
2798 let err = result.unwrap_err().to_string();
2799 assert!(err.contains("killed"));
2800 assert!(err.contains("cannot attach"));
2801 }
2802
2803 #[test]
2806 fn test_cli_parse_alias_spawn() {
2807 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
2808 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
2809 }
2810
2811 #[test]
2812 fn test_cli_parse_alias_list() {
2813 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2814 assert!(matches!(&cli.command, Some(Commands::List)));
2815 }
2816
2817 #[test]
2818 fn test_cli_parse_alias_logs() {
2819 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2820 assert!(matches!(
2821 &cli.command,
2822 Some(Commands::Logs { name, .. }) if name == "my-session"
2823 ));
2824 }
2825
2826 #[test]
2827 fn test_cli_parse_alias_kill() {
2828 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2829 assert!(matches!(
2830 &cli.command,
2831 Some(Commands::Kill { name }) if name == "my-session"
2832 ));
2833 }
2834
2835 #[test]
2836 fn test_cli_parse_alias_delete() {
2837 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2838 assert!(matches!(
2839 &cli.command,
2840 Some(Commands::Delete { name }) if name == "my-session"
2841 ));
2842 }
2843
2844 #[test]
2845 fn test_cli_parse_alias_resume() {
2846 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2847 assert!(matches!(
2848 &cli.command,
2849 Some(Commands::Resume { name }) if name == "my-session"
2850 ));
2851 }
2852
2853 #[test]
2854 fn test_cli_parse_alias_nodes() {
2855 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2856 assert!(matches!(&cli.command, Some(Commands::Nodes)));
2857 }
2858
2859 #[test]
2860 fn test_cli_parse_alias_interventions() {
2861 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2862 assert!(matches!(
2863 &cli.command,
2864 Some(Commands::Interventions { name }) if name == "my-session"
2865 ));
2866 }
2867
2868 #[test]
2869 fn test_api_error_json() {
2870 let err = api_error("{\"error\":\"session not found: foo\"}");
2871 assert_eq!(err.to_string(), "session not found: foo");
2872 }
2873
2874 #[test]
2875 fn test_api_error_plain_text() {
2876 let err = api_error("plain text error");
2877 assert_eq!(err.to_string(), "plain text error");
2878 }
2879
2880 #[test]
2883 fn test_diff_output_empty_prev() {
2884 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2885 }
2886
2887 #[test]
2888 fn test_diff_output_identical() {
2889 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2890 }
2891
2892 #[test]
2893 fn test_diff_output_new_lines_appended() {
2894 let prev = "line1\nline2";
2895 let new = "line1\nline2\nline3\nline4";
2896 assert_eq!(diff_output(prev, new), "line3\nline4");
2897 }
2898
2899 #[test]
2900 fn test_diff_output_scrolled_window() {
2901 let prev = "line1\nline2\nline3";
2903 let new = "line2\nline3\nline4";
2904 assert_eq!(diff_output(prev, new), "line4");
2905 }
2906
2907 #[test]
2908 fn test_diff_output_completely_different() {
2909 let prev = "aaa\nbbb";
2910 let new = "xxx\nyyy";
2911 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2912 }
2913
2914 #[test]
2915 fn test_diff_output_last_line_matches_but_overlap_fails() {
2916 let prev = "aaa\ncommon";
2918 let new = "zzz\ncommon\nnew_line";
2919 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2923 }
2924
2925 #[test]
2926 fn test_diff_output_new_empty() {
2927 assert_eq!(diff_output("line1", ""), "");
2928 }
2929
2930 async fn start_follow_test_server() -> String {
2935 use axum::{Router, extract::Path, extract::Query, routing::get};
2936 use std::sync::Arc;
2937 use std::sync::atomic::{AtomicUsize, Ordering};
2938
2939 let call_count = Arc::new(AtomicUsize::new(0));
2940 let output_count = call_count.clone();
2941
2942 let app = Router::new()
2943 .route(
2944 "/api/v1/sessions/{id}/output",
2945 get(
2946 move |_path: Path<String>,
2947 _query: Query<std::collections::HashMap<String, String>>| {
2948 let count = output_count.clone();
2949 async move {
2950 let n = count.fetch_add(1, Ordering::SeqCst);
2951 let output = match n {
2952 0 => "line1\nline2".to_owned(),
2953 1 => "line1\nline2\nline3".to_owned(),
2954 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
2955 };
2956 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2957 }
2958 },
2959 ),
2960 )
2961 .route(
2962 "/api/v1/sessions/{id}",
2963 get(|_path: Path<String>| async {
2964 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2965 }),
2966 );
2967
2968 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2969 let addr = listener.local_addr().unwrap();
2970 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2971 format!("http://127.0.0.1:{}", addr.port())
2972 }
2973
2974 #[tokio::test]
2975 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
2976 let base = start_follow_test_server().await;
2977 let client = reqwest::Client::new();
2978 let mut buf = Vec::new();
2979
2980 follow_logs(&client, &base, "test", 100, None, &mut buf)
2981 .await
2982 .unwrap();
2983
2984 let output = String::from_utf8(buf).unwrap();
2985 assert!(output.contains("line1"));
2987 assert!(output.contains("line2"));
2988 assert!(output.contains("line3"));
2989 assert!(output.contains("line4"));
2990 assert!(output.contains("[pulpo] Agent exited"));
2991 }
2992
2993 #[tokio::test]
2994 async fn test_execute_logs_follow_success() {
2995 let base = start_follow_test_server().await;
2996 let node = base.strip_prefix("http://").unwrap().to_owned();
2998
2999 let cli = Cli {
3000 node,
3001 token: None,
3002 command: Some(Commands::Logs {
3003 name: "test".into(),
3004 lines: 100,
3005 follow: true,
3006 }),
3007 path: None,
3008 };
3009 let result = execute(&cli).await.unwrap();
3011 assert_eq!(result, "");
3012 }
3013
3014 #[tokio::test]
3015 async fn test_execute_logs_follow_connection_refused() {
3016 let cli = Cli {
3017 node: "localhost:1".into(),
3018 token: None,
3019 command: Some(Commands::Logs {
3020 name: "test".into(),
3021 lines: 50,
3022 follow: true,
3023 }),
3024 path: None,
3025 };
3026 let result = execute(&cli).await;
3027 let err = result.unwrap_err().to_string();
3028 assert!(
3029 err.contains("Could not connect to pulpod"),
3030 "Expected friendly error, got: {err}"
3031 );
3032 }
3033
3034 #[tokio::test]
3035 async fn test_follow_logs_exits_on_dead() {
3036 use axum::{Router, extract::Path, extract::Query, routing::get};
3037
3038 let app = Router::new()
3039 .route(
3040 "/api/v1/sessions/{id}/output",
3041 get(
3042 |_path: Path<String>,
3043 _query: Query<std::collections::HashMap<String, String>>| async {
3044 r#"{"output":"some output"}"#.to_owned()
3045 },
3046 ),
3047 )
3048 .route(
3049 "/api/v1/sessions/{id}",
3050 get(|_path: Path<String>| async {
3051 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3052 }),
3053 );
3054
3055 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3056 let addr = listener.local_addr().unwrap();
3057 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3058 let base = format!("http://127.0.0.1:{}", addr.port());
3059
3060 let client = reqwest::Client::new();
3061 let mut buf = Vec::new();
3062 follow_logs(&client, &base, "test", 100, None, &mut buf)
3063 .await
3064 .unwrap();
3065
3066 let output = String::from_utf8(buf).unwrap();
3067 assert!(output.contains("some output"));
3068 }
3069
3070 #[tokio::test]
3071 async fn test_follow_logs_exits_on_stale() {
3072 use axum::{Router, extract::Path, extract::Query, routing::get};
3073
3074 let app = Router::new()
3075 .route(
3076 "/api/v1/sessions/{id}/output",
3077 get(
3078 |_path: Path<String>,
3079 _query: Query<std::collections::HashMap<String, String>>| async {
3080 r#"{"output":"stale output"}"#.to_owned()
3081 },
3082 ),
3083 )
3084 .route(
3085 "/api/v1/sessions/{id}",
3086 get(|_path: Path<String>| async {
3087 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3088 }),
3089 );
3090
3091 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3092 let addr = listener.local_addr().unwrap();
3093 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3094 let base = format!("http://127.0.0.1:{}", addr.port());
3095
3096 let client = reqwest::Client::new();
3097 let mut buf = Vec::new();
3098 follow_logs(&client, &base, "test", 100, None, &mut buf)
3099 .await
3100 .unwrap();
3101
3102 let output = String::from_utf8(buf).unwrap();
3103 assert!(output.contains("stale output"));
3104 }
3105
3106 #[tokio::test]
3107 async fn test_execute_logs_follow_non_reqwest_error() {
3108 use axum::{Router, extract::Path, extract::Query, routing::get};
3109
3110 let app = Router::new()
3112 .route(
3113 "/api/v1/sessions/{id}/output",
3114 get(
3115 |_path: Path<String>,
3116 _query: Query<std::collections::HashMap<String, String>>| async {
3117 r#"{"output":"initial"}"#.to_owned()
3118 },
3119 ),
3120 )
3121 .route(
3122 "/api/v1/sessions/{id}",
3123 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3124 );
3125
3126 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3127 let addr = listener.local_addr().unwrap();
3128 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3129 let node = format!("127.0.0.1:{}", addr.port());
3130
3131 let cli = Cli {
3132 node,
3133 token: None,
3134 command: Some(Commands::Logs {
3135 name: "test".into(),
3136 lines: 100,
3137 follow: true,
3138 }),
3139 path: None,
3140 };
3141 let err = execute(&cli).await.unwrap_err();
3142 let msg = err.to_string();
3144 assert!(
3145 msg.contains("expected ident"),
3146 "Expected serde parse error, got: {msg}"
3147 );
3148 }
3149
3150 #[tokio::test]
3151 async fn test_fetch_session_status_connection_error() {
3152 let client = reqwest::Client::new();
3153 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3154 assert!(result.is_err());
3155 }
3156
3157 #[test]
3160 fn test_format_schedules_empty() {
3161 assert_eq!(format_schedules(&[]), "No schedules.");
3162 }
3163
3164 #[test]
3165 fn test_format_schedules_with_entries() {
3166 let schedules = vec![serde_json::json!({
3167 "name": "nightly",
3168 "cron": "0 3 * * *",
3169 "enabled": true,
3170 "last_run_at": null,
3171 "target_node": null
3172 })];
3173 let output = format_schedules(&schedules);
3174 assert!(output.contains("nightly"));
3175 assert!(output.contains("0 3 * * *"));
3176 assert!(output.contains("local"));
3177 assert!(output.contains("yes"));
3178 assert!(output.contains('-'));
3179 }
3180
3181 #[test]
3182 fn test_format_schedules_disabled_entry() {
3183 let schedules = vec![serde_json::json!({
3184 "name": "weekly",
3185 "cron": "0 0 * * 0",
3186 "enabled": false,
3187 "last_run_at": "2026-03-18T03:00:00Z",
3188 "target_node": "gpu-box"
3189 })];
3190 let output = format_schedules(&schedules);
3191 assert!(output.contains("weekly"));
3192 assert!(output.contains("no"));
3193 assert!(output.contains("gpu-box"));
3194 assert!(output.contains("2026-03-18T03:00"));
3195 }
3196
3197 #[test]
3198 fn test_format_schedules_header() {
3199 let schedules = vec![serde_json::json!({
3200 "name": "test",
3201 "cron": "* * * * *",
3202 "enabled": true,
3203 "last_run_at": null,
3204 "target_node": null
3205 })];
3206 let output = format_schedules(&schedules);
3207 assert!(output.contains("NAME"));
3208 assert!(output.contains("CRON"));
3209 assert!(output.contains("ENABLED"));
3210 assert!(output.contains("LAST RUN"));
3211 assert!(output.contains("NODE"));
3212 }
3213
3214 #[test]
3217 fn test_cli_parse_schedule_add() {
3218 let cli = Cli::try_parse_from([
3219 "pulpo",
3220 "schedule",
3221 "add",
3222 "nightly",
3223 "0 3 * * *",
3224 "--workdir",
3225 "/repo",
3226 "--",
3227 "claude",
3228 "-p",
3229 "review",
3230 ])
3231 .unwrap();
3232 assert!(matches!(
3233 &cli.command,
3234 Some(Commands::Schedule {
3235 action: ScheduleAction::Add { name, cron, .. }
3236 }) if name == "nightly" && cron == "0 3 * * *"
3237 ));
3238 }
3239
3240 #[test]
3241 fn test_cli_parse_schedule_add_with_node() {
3242 let cli = Cli::try_parse_from([
3243 "pulpo",
3244 "schedule",
3245 "add",
3246 "nightly",
3247 "0 3 * * *",
3248 "--workdir",
3249 "/repo",
3250 "--node",
3251 "gpu-box",
3252 "--",
3253 "claude",
3254 ])
3255 .unwrap();
3256 assert!(matches!(
3257 &cli.command,
3258 Some(Commands::Schedule {
3259 action: ScheduleAction::Add { node, .. }
3260 }) if node.as_deref() == Some("gpu-box")
3261 ));
3262 }
3263
3264 #[test]
3265 fn test_cli_parse_schedule_add_install_alias() {
3266 let cli =
3267 Cli::try_parse_from(["pulpo", "schedule", "install", "nightly", "0 3 * * *"]).unwrap();
3268 assert!(matches!(
3269 &cli.command,
3270 Some(Commands::Schedule {
3271 action: ScheduleAction::Add { name, .. }
3272 }) if name == "nightly"
3273 ));
3274 }
3275
3276 #[test]
3277 fn test_cli_parse_schedule_list() {
3278 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3279 assert!(matches!(
3280 &cli.command,
3281 Some(Commands::Schedule {
3282 action: ScheduleAction::List
3283 })
3284 ));
3285 }
3286
3287 #[test]
3288 fn test_cli_parse_schedule_remove() {
3289 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3290 assert!(matches!(
3291 &cli.command,
3292 Some(Commands::Schedule {
3293 action: ScheduleAction::Remove { name }
3294 }) if name == "nightly"
3295 ));
3296 }
3297
3298 #[test]
3299 fn test_cli_parse_schedule_pause() {
3300 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3301 assert!(matches!(
3302 &cli.command,
3303 Some(Commands::Schedule {
3304 action: ScheduleAction::Pause { name }
3305 }) if name == "nightly"
3306 ));
3307 }
3308
3309 #[test]
3310 fn test_cli_parse_schedule_resume() {
3311 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3312 assert!(matches!(
3313 &cli.command,
3314 Some(Commands::Schedule {
3315 action: ScheduleAction::Resume { name }
3316 }) if name == "nightly"
3317 ));
3318 }
3319
3320 #[test]
3321 fn test_cli_parse_schedule_alias() {
3322 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3323 assert!(matches!(
3324 &cli.command,
3325 Some(Commands::Schedule {
3326 action: ScheduleAction::List
3327 })
3328 ));
3329 }
3330
3331 #[test]
3332 fn test_cli_parse_schedule_list_alias() {
3333 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3334 assert!(matches!(
3335 &cli.command,
3336 Some(Commands::Schedule {
3337 action: ScheduleAction::List
3338 })
3339 ));
3340 }
3341
3342 #[test]
3343 fn test_cli_parse_schedule_remove_alias() {
3344 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3345 assert!(matches!(
3346 &cli.command,
3347 Some(Commands::Schedule {
3348 action: ScheduleAction::Remove { name }
3349 }) if name == "nightly"
3350 ));
3351 }
3352
3353 #[cfg(not(coverage))]
3354 #[tokio::test]
3355 async fn test_execute_schedule_list_via_execute() {
3356 let node = start_test_server().await;
3357 let cli = Cli {
3358 node,
3359 token: None,
3360 command: Some(Commands::Schedule {
3361 action: ScheduleAction::List,
3362 }),
3363 path: None,
3364 };
3365 let result = execute(&cli).await.unwrap();
3366 assert_eq!(result, "No schedules.");
3367 }
3368
3369 #[test]
3370 fn test_schedule_action_debug() {
3371 let action = ScheduleAction::List;
3372 assert_eq!(format!("{action:?}"), "List");
3373 }
3374
3375 #[test]
3376 fn test_cli_parse_send_alias() {
3377 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3378 assert!(matches!(
3379 &cli.command,
3380 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3381 ));
3382 }
3383
3384 #[test]
3385 fn test_cli_parse_spawn_no_name() {
3386 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3387 assert!(matches!(
3388 &cli.command,
3389 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3390 ));
3391 }
3392
3393 #[test]
3394 fn test_cli_parse_spawn_optional_name_with_command() {
3395 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3396 assert!(matches!(
3397 &cli.command,
3398 Some(Commands::Spawn { name, command, .. })
3399 if name.is_none() && command == &["echo", "hello"]
3400 ));
3401 }
3402
3403 #[test]
3404 fn test_cli_parse_path_shortcut() {
3405 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3406 assert!(cli.command.is_none());
3407 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3408 }
3409
3410 #[test]
3411 fn test_cli_parse_no_args() {
3412 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3413 assert!(cli.command.is_none());
3414 assert!(cli.path.is_none());
3415 }
3416
3417 #[test]
3418 fn test_derive_session_name_simple() {
3419 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3420 }
3421
3422 #[test]
3423 fn test_derive_session_name_with_special_chars() {
3424 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3425 }
3426
3427 #[test]
3428 fn test_derive_session_name_root() {
3429 assert_eq!(derive_session_name("/"), "session");
3430 }
3431
3432 #[test]
3433 fn test_derive_session_name_dots() {
3434 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3435 }
3436
3437 #[test]
3438 fn test_resolve_path_absolute() {
3439 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3440 }
3441
3442 #[test]
3443 fn test_resolve_path_relative() {
3444 let resolved = resolve_path("my-repo");
3445 assert!(resolved.ends_with("my-repo"));
3446 assert!(resolved.starts_with('/'));
3447 }
3448
3449 #[tokio::test]
3450 async fn test_execute_path_shortcut() {
3451 let node = start_test_server().await;
3452 let cli = Cli {
3453 node,
3454 token: None,
3455 path: Some("/tmp".into()),
3456 command: None,
3457 };
3458 let result = execute(&cli).await.unwrap();
3459 assert!(result.contains("Detached from session"));
3460 }
3461
3462 #[test]
3465 fn test_node_needs_resolution() {
3466 assert!(!node_needs_resolution("localhost:7433"));
3467 assert!(!node_needs_resolution("mac-mini:7433"));
3468 assert!(!node_needs_resolution("10.0.0.1:7433"));
3469 assert!(!node_needs_resolution("[::1]:7433"));
3470 assert!(node_needs_resolution("mac-mini"));
3471 assert!(node_needs_resolution("linux-server"));
3472 assert!(node_needs_resolution("localhost"));
3473 }
3474
3475 #[tokio::test]
3476 async fn test_resolve_node_with_port() {
3477 let client = reqwest::Client::new();
3478 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
3479 assert_eq!(addr, "mac-mini:7433");
3480 assert!(token.is_none());
3481 }
3482
3483 #[tokio::test]
3484 async fn test_resolve_node_fallback_appends_port() {
3485 let client = reqwest::Client::new();
3488 let (addr, token) = resolve_node(&client, "unknown-host").await;
3489 assert_eq!(addr, "unknown-host:7433");
3490 assert!(token.is_none());
3491 }
3492
3493 #[cfg(not(coverage))]
3494 #[tokio::test]
3495 async fn test_resolve_node_finds_peer() {
3496 use axum::{Router, routing::get};
3497
3498 let app = Router::new()
3499 .route(
3500 "/api/v1/peers",
3501 get(|| async {
3502 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
3503 }),
3504 )
3505 .route(
3506 "/api/v1/config",
3507 get(|| async {
3508 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3509 }),
3510 );
3511
3512 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3514 return;
3515 };
3516 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3517
3518 let client = reqwest::Client::new();
3519 let (addr, token) = resolve_node(&client, "mac-mini").await;
3520 assert_eq!(addr, "10.0.0.5:7433");
3521 assert_eq!(token, Some("peer-secret".into()));
3522 }
3523
3524 #[cfg(not(coverage))]
3525 #[tokio::test]
3526 async fn test_resolve_node_peer_no_token() {
3527 use axum::{Router, routing::get};
3528
3529 let app = Router::new()
3530 .route(
3531 "/api/v1/peers",
3532 get(|| async {
3533 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3534 }),
3535 )
3536 .route(
3537 "/api/v1/config",
3538 get(|| async {
3539 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3540 }),
3541 );
3542
3543 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3544 return; };
3546 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3547
3548 let client = reqwest::Client::new();
3549 let (addr, token) = resolve_node(&client, "test-peer").await;
3550 assert_eq!(addr, "10.0.0.9:7433");
3551 assert!(token.is_none()); }
3553
3554 #[tokio::test]
3555 async fn test_execute_with_peer_name_resolution() {
3556 let cli = Cli {
3560 node: "nonexistent-peer".into(),
3561 token: None,
3562 command: Some(Commands::List),
3563 path: None,
3564 };
3565 let result = execute(&cli).await;
3566 assert!(result.is_err());
3568 }
3569
3570 #[test]
3573 fn test_cli_parse_spawn_auto() {
3574 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
3575 assert!(matches!(
3576 &cli.command,
3577 Some(Commands::Spawn { auto, .. }) if *auto
3578 ));
3579 }
3580
3581 #[test]
3582 fn test_cli_parse_spawn_auto_default() {
3583 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
3584 assert!(matches!(
3585 &cli.command,
3586 Some(Commands::Spawn { auto, .. }) if !auto
3587 ));
3588 }
3589
3590 #[tokio::test]
3591 async fn test_select_best_node_coverage_stub() {
3592 let client = reqwest::Client::new();
3594 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
3597 }
3598
3599 #[cfg(not(coverage))]
3600 #[tokio::test]
3601 async fn test_select_best_node_picks_least_loaded() {
3602 use axum::{Router, routing::get};
3603
3604 let app = Router::new().route(
3605 "/api/v1/peers",
3606 get(|| async {
3607 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
3608 }),
3609 );
3610 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3611 let addr = listener.local_addr().unwrap();
3612 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3613 let base = format!("http://127.0.0.1:{}", addr.port());
3614
3615 let client = reqwest::Client::new();
3616 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3617 assert_eq!(name, "idle");
3621 assert_eq!(addr, "idle:7433");
3622 }
3623
3624 #[cfg(not(coverage))]
3625 #[tokio::test]
3626 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
3627 use axum::{Router, routing::get};
3628
3629 let app = Router::new().route(
3630 "/api/v1/peers",
3631 get(|| async {
3632 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3633 }),
3634 );
3635 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3636 let addr = listener.local_addr().unwrap();
3637 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3638 let base = format!("http://127.0.0.1:{}", addr.port());
3639
3640 let client = reqwest::Client::new();
3641 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3642 assert_eq!(name, "my-mac");
3643 assert_eq!(addr, "localhost:7433");
3644 }
3645
3646 #[cfg(not(coverage))]
3647 #[tokio::test]
3648 async fn test_select_best_node_empty_peers_falls_back_to_local() {
3649 use axum::{Router, routing::get};
3650
3651 let app = Router::new().route(
3652 "/api/v1/peers",
3653 get(|| async {
3654 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
3655 }),
3656 );
3657 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3658 let addr = listener.local_addr().unwrap();
3659 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3660 let base = format!("http://127.0.0.1:{}", addr.port());
3661
3662 let client = reqwest::Client::new();
3663 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3664 assert_eq!(name, "solo");
3665 assert_eq!(addr, "localhost:7433");
3666 }
3667
3668 #[cfg(not(coverage))]
3669 #[tokio::test]
3670 async fn test_execute_spawn_auto_selects_node() {
3671 use axum::{
3672 Router,
3673 http::StatusCode,
3674 routing::{get, post},
3675 };
3676
3677 let create_json = test_create_response_json();
3678
3679 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3681 let addr = listener.local_addr().unwrap();
3682 let node = format!("127.0.0.1:{}", addr.port());
3683 let peer_addr = node.clone();
3684
3685 let app = Router::new()
3686 .route(
3687 "/api/v1/peers",
3688 get(move || {
3689 let peer_addr = peer_addr.clone();
3690 async move {
3691 format!(
3692 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
3693 )
3694 }
3695 }),
3696 )
3697 .route(
3698 "/api/v1/sessions",
3699 post(move || async move {
3700 (StatusCode::CREATED, create_json.clone())
3701 }),
3702 );
3703
3704 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3705
3706 let cli = Cli {
3707 node,
3708 token: None,
3709 command: Some(Commands::Spawn {
3710 name: Some("test".into()),
3711 workdir: Some("/tmp/repo".into()),
3712 ink: None,
3713 description: None,
3714 detach: true,
3715 idle_threshold: None,
3716 auto: true,
3717 worktree: false,
3718 sandbox: false,
3719 command: vec!["echo".into(), "hello".into()],
3720 }),
3721 path: None,
3722 };
3723 let result = execute(&cli).await.unwrap();
3724 assert!(result.contains("Created session"));
3725 }
3726
3727 #[cfg(not(coverage))]
3728 #[tokio::test]
3729 async fn test_select_best_node_peer_no_session_count() {
3730 use axum::{Router, routing::get};
3731
3732 let app = Router::new().route(
3733 "/api/v1/peers",
3734 get(|| async {
3735 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3736 }),
3737 );
3738 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3739 let addr = listener.local_addr().unwrap();
3740 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3741 let base = format!("http://127.0.0.1:{}", addr.port());
3742
3743 let client = reqwest::Client::new();
3744 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3745 assert_eq!(name, "fresh");
3747 assert_eq!(addr, "fresh:7433");
3748 }
3749}