1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, ConfigResponse, CreateSessionResponse, InterventionEventResponse,
5 PeersResponse,
6};
7use pulpo_common::session::Session;
8
9#[derive(Parser, Debug)]
10#[command(
11 name = "pulpo",
12 about = "Manage agent sessions across your machines",
13 version = env!("PULPO_VERSION"),
14 args_conflicts_with_subcommands = true
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Option<Commands>,
27
28 #[arg(value_name = "PATH")]
30 pub path: Option<String>,
31}
32
33#[derive(Subcommand, Debug)]
34#[allow(clippy::large_enum_variant)]
35pub enum Commands {
36 #[command(visible_alias = "a")]
38 Attach {
39 name: String,
41 },
42
43 #[command(visible_alias = "i", visible_alias = "send")]
45 Input {
46 name: String,
48 text: Option<String>,
50 },
51
52 #[command(visible_alias = "s")]
54 Spawn {
55 name: Option<String>,
57
58 #[arg(long)]
60 workdir: Option<String>,
61
62 #[arg(long)]
64 ink: Option<String>,
65
66 #[arg(long)]
68 description: Option<String>,
69
70 #[arg(short, long)]
72 detach: bool,
73
74 #[arg(long)]
76 idle_threshold: Option<u32>,
77
78 #[arg(long)]
80 auto: bool,
81
82 #[arg(last = true)]
84 command: Vec<String>,
85 },
86
87 #[command(visible_alias = "ls")]
89 List,
90
91 #[command(visible_alias = "l")]
93 Logs {
94 name: String,
96
97 #[arg(long, default_value = "100")]
99 lines: usize,
100
101 #[arg(short, long)]
103 follow: bool,
104 },
105
106 #[command(visible_alias = "k")]
108 Kill {
109 name: String,
111 },
112
113 #[command(visible_alias = "rm")]
115 Delete {
116 name: String,
118 },
119
120 #[command(visible_alias = "r")]
122 Resume {
123 name: String,
125 },
126
127 #[command(visible_alias = "n")]
129 Nodes,
130
131 #[command(visible_alias = "iv")]
133 Interventions {
134 name: String,
136 },
137
138 Ui,
140
141 #[command(visible_alias = "sched")]
143 Schedule {
144 #[command(subcommand)]
145 action: ScheduleAction,
146 },
147}
148
149#[derive(Subcommand, Debug)]
150pub enum ScheduleAction {
151 Install {
153 name: String,
155 cron: String,
157 #[arg(long)]
159 workdir: String,
160 #[arg(last = true)]
162 command: Vec<String>,
163 },
164 #[command(alias = "ls")]
166 List,
167 #[command(alias = "rm")]
169 Remove {
170 name: String,
172 },
173 Pause {
175 name: String,
177 },
178 Resume {
180 name: String,
182 },
183}
184
185const AGENT_EXIT_MARKER: &str = "[pulpo] Agent exited";
187
188fn resolve_path(path: &str) -> String {
190 let p = std::path::Path::new(path);
191 if p.is_absolute() {
192 path.to_owned()
193 } else {
194 std::env::current_dir().map_or_else(
195 |_| path.to_owned(),
196 |cwd| cwd.join(p).to_string_lossy().into_owned(),
197 )
198 }
199}
200
201fn derive_session_name(path: &str) -> String {
203 let basename = std::path::Path::new(path)
204 .file_name()
205 .and_then(|n| n.to_str())
206 .unwrap_or("session");
207 let kebab: String = basename
209 .chars()
210 .map(|c| {
211 if c.is_ascii_alphanumeric() {
212 c.to_ascii_lowercase()
213 } else {
214 '-'
215 }
216 })
217 .collect();
218 let mut result = String::new();
220 for c in kebab.chars() {
221 if c == '-' && result.ends_with('-') {
222 continue;
223 }
224 result.push(c);
225 }
226 let result = result.trim_matches('-').to_owned();
227 if result.is_empty() {
228 "session".to_owned()
229 } else {
230 result
231 }
232}
233
234async fn deduplicate_session_name(
236 client: &reqwest::Client,
237 base: &str,
238 name: &str,
239 token: Option<&str>,
240) -> String {
241 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
243 .send()
244 .await;
245 match resp {
246 Ok(r) if r.status().is_success() => {
247 for i in 2..=99 {
249 let candidate = format!("{name}-{i}");
250 let resp = authed_get(client, format!("{base}/api/v1/sessions/{candidate}"), token)
251 .send()
252 .await;
253 match resp {
254 Ok(r) if r.status().is_success() => {}
255 _ => return candidate,
256 }
257 }
258 format!("{name}-100")
259 }
260 _ => name.to_owned(),
261 }
262}
263
264pub fn base_url(node: &str) -> String {
266 format!("http://{node}")
267}
268
269#[derive(serde::Deserialize)]
271struct OutputResponse {
272 output: String,
273}
274
275fn format_sessions(sessions: &[Session]) -> String {
277 if sessions.is_empty() {
278 return "No sessions.".into();
279 }
280 let mut lines = vec![format!("{:<20} {:<12} {}", "NAME", "STATUS", "COMMAND")];
281 for s in sessions {
282 let cmd_display = if s.command.len() > 50 {
283 format!("{}...", &s.command[..47])
284 } else {
285 s.command.clone()
286 };
287 lines.push(format!("{:<20} {:<12} {}", s.name, s.status, cmd_display));
288 }
289 lines.join("\n")
290}
291
292fn format_nodes(resp: &PeersResponse) -> String {
294 let mut lines = vec![format!(
295 "{:<20} {:<25} {:<10} {}",
296 "NAME", "ADDRESS", "STATUS", "SESSIONS"
297 )];
298 lines.push(format!(
299 "{:<20} {:<25} {:<10} {}",
300 resp.local.name, "(local)", "online", "-"
301 ));
302 for p in &resp.peers {
303 let sessions = p
304 .session_count
305 .map_or_else(|| "-".into(), |c| c.to_string());
306 lines.push(format!(
307 "{:<20} {:<25} {:<10} {}",
308 p.name, p.address, p.status, sessions
309 ));
310 }
311 lines.join("\n")
312}
313
314fn format_interventions(events: &[InterventionEventResponse]) -> String {
316 if events.is_empty() {
317 return "No intervention events.".into();
318 }
319 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
320 for e in events {
321 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
322 }
323 lines.join("\n")
324}
325
326#[cfg_attr(coverage, allow(dead_code))]
328fn build_open_command(url: &str) -> std::process::Command {
329 #[cfg(target_os = "macos")]
330 {
331 let mut cmd = std::process::Command::new("open");
332 cmd.arg(url);
333 cmd
334 }
335 #[cfg(target_os = "linux")]
336 {
337 let mut cmd = std::process::Command::new("xdg-open");
338 cmd.arg(url);
339 cmd
340 }
341 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
342 {
343 let mut cmd = std::process::Command::new("xdg-open");
345 cmd.arg(url);
346 cmd
347 }
348}
349
350#[cfg(not(coverage))]
352fn open_browser(url: &str) -> Result<()> {
353 build_open_command(url).status()?;
354 Ok(())
355}
356
357#[cfg(coverage)]
359fn open_browser(_url: &str) -> Result<()> {
360 Ok(())
361}
362
363#[cfg_attr(coverage, allow(dead_code))]
366fn build_attach_command(backend_session_id: &str) -> std::process::Command {
367 let mut cmd = std::process::Command::new("tmux");
368 cmd.args(["attach-session", "-t", backend_session_id]);
369 cmd
370}
371
372#[cfg(not(any(test, coverage)))]
374fn attach_session(backend_session_id: &str) -> Result<()> {
375 let status = build_attach_command(backend_session_id).status()?;
376 if !status.success() {
377 anyhow::bail!("attach failed with {status}");
378 }
379 Ok(())
380}
381
382#[cfg(any(test, coverage))]
384#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
385fn attach_session(_backend_session_id: &str) -> Result<()> {
386 Ok(())
387}
388
389fn api_error(text: &str) -> anyhow::Error {
391 serde_json::from_str::<serde_json::Value>(text)
392 .ok()
393 .and_then(|v| v["error"].as_str().map(String::from))
394 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
395}
396
397async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
399 if resp.status().is_success() {
400 Ok(resp.text().await?)
401 } else {
402 let text = resp.text().await?;
403 Err(api_error(&text))
404 }
405}
406
407fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
409 if err.is_connect() {
410 anyhow::anyhow!(
411 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
412 )
413 } else {
414 anyhow::anyhow!("Network error connecting to {node}: {err}")
415 }
416}
417
418fn is_localhost(node: &str) -> bool {
420 let host = node.split(':').next().unwrap_or(node);
421 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
422}
423
424async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
426 let resp = client
427 .get(format!("{base}/api/v1/auth/token"))
428 .send()
429 .await
430 .ok()?;
431 let body: AuthTokenResponse = resp.json().await.ok()?;
432 if body.token.is_empty() {
433 None
434 } else {
435 Some(body.token)
436 }
437}
438
439async fn resolve_token(
441 client: &reqwest::Client,
442 base: &str,
443 node: &str,
444 explicit: Option<&str>,
445) -> Option<String> {
446 if let Some(t) = explicit {
447 return Some(t.to_owned());
448 }
449 if is_localhost(node) {
450 return discover_token(client, base).await;
451 }
452 None
453}
454
455fn node_needs_resolution(node: &str) -> bool {
457 !node.contains(':')
458}
459
460#[cfg(not(coverage))]
467async fn resolve_node(client: &reqwest::Client, node: &str) -> (String, Option<String>) {
468 if !node_needs_resolution(node) {
470 return (node.to_owned(), None);
471 }
472
473 let local_base = "http://localhost:7433";
475 let mut resolved_address: Option<String> = None;
476
477 if let Ok(resp) = client
478 .get(format!("{local_base}/api/v1/peers"))
479 .send()
480 .await
481 && let Ok(peers_resp) = resp.json::<PeersResponse>().await
482 {
483 for peer in &peers_resp.peers {
484 if peer.name == node {
485 resolved_address = Some(peer.address.clone());
486 break;
487 }
488 }
489 }
490
491 let address = resolved_address.unwrap_or_else(|| format!("{node}:7433"));
492
493 let peer_token = if let Ok(resp) = client
495 .get(format!("{local_base}/api/v1/config"))
496 .send()
497 .await
498 && let Ok(config) = resp.json::<ConfigResponse>().await
499 && let Some(entry) = config.peers.get(node)
500 {
501 entry.token().map(String::from)
502 } else {
503 None
504 };
505
506 (address, peer_token)
507}
508
509#[cfg(coverage)]
511async fn resolve_node(_client: &reqwest::Client, node: &str) -> (String, Option<String>) {
512 if node_needs_resolution(node) {
513 (format!("{node}:7433"), None)
514 } else {
515 (node.to_owned(), None)
516 }
517}
518
519fn authed_get(
521 client: &reqwest::Client,
522 url: String,
523 token: Option<&str>,
524) -> reqwest::RequestBuilder {
525 let req = client.get(url);
526 if let Some(t) = token {
527 req.bearer_auth(t)
528 } else {
529 req
530 }
531}
532
533fn authed_post(
535 client: &reqwest::Client,
536 url: String,
537 token: Option<&str>,
538) -> reqwest::RequestBuilder {
539 let req = client.post(url);
540 if let Some(t) = token {
541 req.bearer_auth(t)
542 } else {
543 req
544 }
545}
546
547fn authed_delete(
549 client: &reqwest::Client,
550 url: String,
551 token: Option<&str>,
552) -> reqwest::RequestBuilder {
553 let req = client.delete(url);
554 if let Some(t) = token {
555 req.bearer_auth(t)
556 } else {
557 req
558 }
559}
560
561async fn fetch_output(
563 client: &reqwest::Client,
564 base: &str,
565 name: &str,
566 lines: usize,
567 token: Option<&str>,
568) -> Result<String> {
569 let resp = authed_get(
570 client,
571 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
572 token,
573 )
574 .send()
575 .await?;
576 let text = ok_or_api_error(resp).await?;
577 let output: OutputResponse = serde_json::from_str(&text)?;
578 Ok(output.output)
579}
580
581async fn fetch_session_status(
583 client: &reqwest::Client,
584 base: &str,
585 name: &str,
586 token: Option<&str>,
587) -> Result<String> {
588 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
589 .send()
590 .await?;
591 let text = ok_or_api_error(resp).await?;
592 let session: Session = serde_json::from_str(&text)?;
593 Ok(session.status.to_string())
594}
595
596fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
603 if prev.is_empty() {
604 return new;
605 }
606
607 let prev_lines: Vec<&str> = prev.lines().collect();
608 let new_lines: Vec<&str> = new.lines().collect();
609
610 if new_lines.is_empty() {
611 return "";
612 }
613
614 let last_prev = prev_lines[prev_lines.len() - 1];
616
617 for i in (0..new_lines.len()).rev() {
619 if new_lines[i] == last_prev {
620 let overlap_len = prev_lines.len().min(i + 1);
622 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
623 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
624 if prev_tail == new_overlap {
625 if i + 1 < new_lines.len() {
626 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
628 return new.get(consumed.min(new.len())..).unwrap_or("");
629 }
630 return "";
631 }
632 }
633 }
634
635 new
637}
638
639async fn follow_logs(
641 client: &reqwest::Client,
642 base: &str,
643 name: &str,
644 lines: usize,
645 token: Option<&str>,
646 writer: &mut (dyn std::io::Write + Send),
647) -> Result<()> {
648 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
649 write!(writer, "{prev_output}")?;
650
651 let mut unchanged_ticks: u32 = 0;
652
653 loop {
654 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
655
656 let new_output = fetch_output(client, base, name, lines, token).await?;
658
659 let diff = diff_output(&prev_output, &new_output);
660 if diff.is_empty() {
661 unchanged_ticks += 1;
662 } else {
663 write!(writer, "{diff}")?;
664 unchanged_ticks = 0;
665 }
666
667 if new_output.contains(AGENT_EXIT_MARKER) {
669 break;
670 }
671
672 prev_output = new_output;
673
674 if unchanged_ticks >= 3 {
676 let status = fetch_session_status(client, base, name, token).await?;
677 let is_terminal = status == "ready" || status == "killed" || status == "lost";
678 if is_terminal {
679 break;
680 }
681 }
682 }
683 Ok(())
684}
685
686#[cfg_attr(coverage, allow(dead_code))]
689const CRONTAB_TAG: &str = "#pulpo:";
690
691#[cfg(not(coverage))]
693fn read_crontab() -> Result<String> {
694 let output = std::process::Command::new("crontab").arg("-l").output()?;
695 if output.status.success() {
696 Ok(String::from_utf8_lossy(&output.stdout).to_string())
697 } else {
698 Ok(String::new())
699 }
700}
701
702#[cfg(not(coverage))]
704fn write_crontab(content: &str) -> Result<()> {
705 use std::io::Write;
706 let mut child = std::process::Command::new("crontab")
707 .arg("-")
708 .stdin(std::process::Stdio::piped())
709 .spawn()?;
710 child
711 .stdin
712 .as_mut()
713 .unwrap()
714 .write_all(content.as_bytes())?;
715 let status = child.wait()?;
716 if !status.success() {
717 anyhow::bail!("crontab write failed");
718 }
719 Ok(())
720}
721
722#[cfg_attr(coverage, allow(dead_code))]
724fn build_crontab_line(name: &str, cron: &str, workdir: &str, command: &str, node: &str) -> String {
725 format!(
726 "{cron} pulpo --node {node} spawn {name} --workdir {workdir} -- {command} {CRONTAB_TAG}{name}\n"
727 )
728}
729
730#[cfg_attr(coverage, allow(dead_code))]
732fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
733 let tag = format!("{CRONTAB_TAG}{name}");
734 if crontab.contains(&tag) {
735 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
736 }
737 let mut result = crontab.to_owned();
738 result.push_str(line);
739 Ok(result)
740}
741
742#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_list(crontab: &str) -> String {
745 let entries: Vec<&str> = crontab
746 .lines()
747 .filter(|l| l.contains(CRONTAB_TAG))
748 .collect();
749 if entries.is_empty() {
750 return "No pulpo schedules.".into();
751 }
752 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
753 for entry in entries {
754 let paused = entry.starts_with('#');
755 let raw = entry.trim_start_matches('#').trim();
756 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
757 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
758 let cron_expr = if parts.len() >= 5 {
759 parts[..5].join(" ")
760 } else {
761 "?".into()
762 };
763 lines.push(format!(
764 "{:<20} {:<15} {}",
765 name,
766 cron_expr,
767 if paused { "yes" } else { "no" }
768 ));
769 }
770 lines.join("\n")
771}
772
773#[cfg_attr(coverage, allow(dead_code))]
775fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
776 use std::fmt::Write;
777 let tag = format!("{CRONTAB_TAG}{name}");
778 let filtered =
779 crontab
780 .lines()
781 .filter(|l| !l.contains(&tag))
782 .fold(String::new(), |mut acc, l| {
783 writeln!(acc, "{l}").unwrap();
784 acc
785 });
786 if filtered.len() == crontab.len() {
787 anyhow::bail!("schedule \"{name}\" not found");
788 }
789 Ok(filtered)
790}
791
792#[cfg_attr(coverage, allow(dead_code))]
794fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
795 use std::fmt::Write;
796 let tag = format!("{CRONTAB_TAG}{name}");
797 let mut found = false;
798 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
799 if l.contains(&tag) && !l.starts_with('#') {
800 found = true;
801 writeln!(acc, "#{l}").unwrap();
802 } else {
803 writeln!(acc, "{l}").unwrap();
804 }
805 acc
806 });
807 if !found {
808 anyhow::bail!("schedule \"{name}\" not found or already paused");
809 }
810 Ok(updated)
811}
812
813#[cfg_attr(coverage, allow(dead_code))]
815fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
816 use std::fmt::Write;
817 let tag = format!("{CRONTAB_TAG}{name}");
818 let mut found = false;
819 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
820 if l.contains(&tag) && l.starts_with('#') {
821 found = true;
822 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
823 } else {
824 writeln!(acc, "{l}").unwrap();
825 }
826 acc
827 });
828 if !found {
829 anyhow::bail!("schedule \"{name}\" not found or not paused");
830 }
831 Ok(updated)
832}
833
834#[cfg(not(coverage))]
836fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
837 match action {
838 ScheduleAction::Install {
839 name,
840 cron,
841 workdir,
842 command,
843 } => {
844 let crontab = read_crontab()?;
845 let joined_command = command.join(" ");
846 let line = build_crontab_line(name, cron, workdir, &joined_command, node);
847 let updated = crontab_install(&crontab, name, &line)?;
848 write_crontab(&updated)?;
849 Ok(format!("Installed schedule \"{name}\""))
850 }
851 ScheduleAction::List => {
852 let crontab = read_crontab()?;
853 Ok(crontab_list(&crontab))
854 }
855 ScheduleAction::Remove { name } => {
856 let crontab = read_crontab()?;
857 let updated = crontab_remove(&crontab, name)?;
858 write_crontab(&updated)?;
859 Ok(format!("Removed schedule \"{name}\""))
860 }
861 ScheduleAction::Pause { name } => {
862 let crontab = read_crontab()?;
863 let updated = crontab_pause(&crontab, name)?;
864 write_crontab(&updated)?;
865 Ok(format!("Paused schedule \"{name}\""))
866 }
867 ScheduleAction::Resume { name } => {
868 let crontab = read_crontab()?;
869 let updated = crontab_resume(&crontab, name)?;
870 write_crontab(&updated)?;
871 Ok(format!("Resumed schedule \"{name}\""))
872 }
873 }
874}
875
876#[cfg(coverage)]
878fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
879 Ok(String::new())
880}
881
882#[cfg(not(coverage))]
886async fn select_best_node(
887 client: &reqwest::Client,
888 base: &str,
889 token: Option<&str>,
890) -> Result<(String, String)> {
891 let resp = authed_get(client, format!("{base}/api/v1/peers"), token)
892 .send()
893 .await?;
894 let text = ok_or_api_error(resp).await?;
895 let peers_resp: PeersResponse = serde_json::from_str(&text)?;
896
897 let mut best: Option<(String, String, f64)> = None; for peer in &peers_resp.peers {
901 if peer.status != pulpo_common::peer::PeerStatus::Online {
902 continue;
903 }
904 let sessions = peer.session_count.unwrap_or(0);
905 let mem = peer.node_info.as_ref().map_or(0, |n| n.memory_mb);
906 #[allow(clippy::cast_precision_loss)]
908 let score = sessions as f64 - (mem as f64 / 1024.0);
909 if best.as_ref().is_none_or(|(_, _, s)| score < *s) {
910 best = Some((peer.address.clone(), peer.name.clone(), score));
911 }
912 }
913
914 match best {
916 Some((addr, name, _)) => Ok((addr, name)),
917 None => Ok(("localhost:7433".into(), peers_resp.local.name)),
918 }
919}
920
921#[cfg(coverage)]
923#[allow(clippy::unnecessary_wraps)]
924async fn select_best_node(
925 _client: &reqwest::Client,
926 _base: &str,
927 _token: Option<&str>,
928) -> Result<(String, String)> {
929 Ok(("localhost:7433".into(), "local".into()))
930}
931
932#[allow(clippy::too_many_lines)]
934pub async fn execute(cli: &Cli) -> Result<String> {
935 let client = reqwest::Client::new();
936 let (resolved_node, peer_token) = resolve_node(&client, &cli.node).await;
937 let url = base_url(&resolved_node);
938 let node = &resolved_node;
939 let token = resolve_token(&client, &url, node, cli.token.as_deref())
940 .await
941 .or(peer_token);
942
943 if cli.command.is_none() {
945 let path = cli.path.as_deref().unwrap_or(".");
946 let resolved_workdir = resolve_path(path);
947 let base_name = derive_session_name(&resolved_workdir);
948 let name = deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await;
949 let body = serde_json::json!({
950 "name": name,
951 "workdir": resolved_workdir,
952 });
953 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
954 .json(&body)
955 .send()
956 .await
957 .map_err(|e| friendly_error(&e, node))?;
958 let text = ok_or_api_error(resp).await?;
959 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
960 let msg = format!(
961 "Created session \"{}\" ({})",
962 resp.session.name, resp.session.id
963 );
964 let backend_id = resp
965 .session
966 .backend_session_id
967 .as_deref()
968 .unwrap_or(&resp.session.name);
969 eprintln!("{msg}");
970 attach_session(backend_id)?;
971 return Ok(format!("Detached from session \"{}\".", resp.session.name));
972 }
973
974 match cli.command.as_ref().unwrap() {
975 Commands::Attach { name } => {
976 let resp = authed_get(
978 &client,
979 format!("{url}/api/v1/sessions/{name}"),
980 token.as_deref(),
981 )
982 .send()
983 .await
984 .map_err(|e| friendly_error(&e, node))?;
985 let text = ok_or_api_error(resp).await?;
986 let session: Session = serde_json::from_str(&text)?;
987 match session.status.to_string().as_str() {
988 "lost" => {
989 anyhow::bail!(
990 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
991 );
992 }
993 "killed" => {
994 anyhow::bail!(
995 "Session \"{name}\" is {} — cannot attach to a killed session.",
996 session.status
997 );
998 }
999 _ => {}
1000 }
1001 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
1002 attach_session(&backend_id)?;
1003 Ok(format!("Detached from session {name}."))
1004 }
1005 Commands::Input { name, text } => {
1006 let input_text = text.as_deref().unwrap_or("\n");
1007 let body = serde_json::json!({ "text": input_text });
1008 let resp = authed_post(
1009 &client,
1010 format!("{url}/api/v1/sessions/{name}/input"),
1011 token.as_deref(),
1012 )
1013 .json(&body)
1014 .send()
1015 .await
1016 .map_err(|e| friendly_error(&e, node))?;
1017 ok_or_api_error(resp).await?;
1018 Ok(format!("Sent input to session {name}."))
1019 }
1020 Commands::List => {
1021 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1022 .send()
1023 .await
1024 .map_err(|e| friendly_error(&e, node))?;
1025 let text = ok_or_api_error(resp).await?;
1026 let sessions: Vec<Session> = serde_json::from_str(&text)?;
1027 Ok(format_sessions(&sessions))
1028 }
1029 Commands::Nodes => {
1030 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
1031 .send()
1032 .await
1033 .map_err(|e| friendly_error(&e, node))?;
1034 let text = ok_or_api_error(resp).await?;
1035 let resp: PeersResponse = serde_json::from_str(&text)?;
1036 Ok(format_nodes(&resp))
1037 }
1038 Commands::Spawn {
1039 workdir,
1040 name,
1041 ink,
1042 description,
1043 detach,
1044 idle_threshold,
1045 auto,
1046 command,
1047 } => {
1048 let cmd = if command.is_empty() {
1049 None
1050 } else {
1051 Some(command.join(" "))
1052 };
1053 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1055 std::env::current_dir()
1056 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1057 });
1058 let resolved_name = if let Some(n) = name {
1060 n.clone()
1061 } else {
1062 let base_name = derive_session_name(&resolved_workdir);
1063 deduplicate_session_name(&client, &url, &base_name, token.as_deref()).await
1064 };
1065 let mut body = serde_json::json!({
1066 "name": resolved_name,
1067 "workdir": resolved_workdir,
1068 });
1069 if let Some(c) = &cmd {
1070 body["command"] = serde_json::json!(c);
1071 }
1072 if let Some(i) = ink {
1073 body["ink"] = serde_json::json!(i);
1074 }
1075 if let Some(d) = description {
1076 body["description"] = serde_json::json!(d);
1077 }
1078 if let Some(t) = idle_threshold {
1079 body["idle_threshold_secs"] = serde_json::json!(t);
1080 }
1081 let spawn_url = if *auto {
1082 let (auto_addr, auto_name) =
1083 select_best_node(&client, &url, token.as_deref()).await?;
1084 eprintln!("Auto-selected node: {auto_name} ({auto_addr})");
1085 base_url(&auto_addr)
1086 } else {
1087 url.clone()
1088 };
1089 let resp = authed_post(
1090 &client,
1091 format!("{spawn_url}/api/v1/sessions"),
1092 token.as_deref(),
1093 )
1094 .json(&body)
1095 .send()
1096 .await
1097 .map_err(|e| friendly_error(&e, node))?;
1098 let text = ok_or_api_error(resp).await?;
1099 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1100 let msg = format!(
1101 "Created session \"{}\" ({})",
1102 resp.session.name, resp.session.id
1103 );
1104 if !detach {
1105 let backend_id = resp
1106 .session
1107 .backend_session_id
1108 .as_deref()
1109 .unwrap_or(&resp.session.name);
1110 eprintln!("{msg}");
1111 attach_session(backend_id)?;
1112 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1113 }
1114 Ok(msg)
1115 }
1116 Commands::Kill { name } => {
1117 let resp = authed_post(
1118 &client,
1119 format!("{url}/api/v1/sessions/{name}/kill"),
1120 token.as_deref(),
1121 )
1122 .send()
1123 .await
1124 .map_err(|e| friendly_error(&e, node))?;
1125 ok_or_api_error(resp).await?;
1126 Ok(format!("Session {name} killed."))
1127 }
1128 Commands::Delete { name } => {
1129 let resp = authed_delete(
1130 &client,
1131 format!("{url}/api/v1/sessions/{name}"),
1132 token.as_deref(),
1133 )
1134 .send()
1135 .await
1136 .map_err(|e| friendly_error(&e, node))?;
1137 ok_or_api_error(resp).await?;
1138 Ok(format!("Session {name} deleted."))
1139 }
1140 Commands::Logs {
1141 name,
1142 lines,
1143 follow,
1144 } => {
1145 if *follow {
1146 let mut stdout = std::io::stdout();
1147 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1148 .await
1149 .map_err(|e| {
1150 match e.downcast::<reqwest::Error>() {
1152 Ok(re) => friendly_error(&re, node),
1153 Err(other) => other,
1154 }
1155 })?;
1156 Ok(String::new())
1157 } else {
1158 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1159 .await
1160 .map_err(|e| match e.downcast::<reqwest::Error>() {
1161 Ok(re) => friendly_error(&re, node),
1162 Err(other) => other,
1163 })?;
1164 Ok(output)
1165 }
1166 }
1167 Commands::Interventions { name } => {
1168 let resp = authed_get(
1169 &client,
1170 format!("{url}/api/v1/sessions/{name}/interventions"),
1171 token.as_deref(),
1172 )
1173 .send()
1174 .await
1175 .map_err(|e| friendly_error(&e, node))?;
1176 let text = ok_or_api_error(resp).await?;
1177 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1178 Ok(format_interventions(&events))
1179 }
1180 Commands::Ui => {
1181 let dashboard = base_url(node);
1182 open_browser(&dashboard)?;
1183 Ok(format!("Opening {dashboard}"))
1184 }
1185 Commands::Resume { name } => {
1186 let resp = authed_post(
1187 &client,
1188 format!("{url}/api/v1/sessions/{name}/resume"),
1189 token.as_deref(),
1190 )
1191 .send()
1192 .await
1193 .map_err(|e| friendly_error(&e, node))?;
1194 let text = ok_or_api_error(resp).await?;
1195 let session: Session = serde_json::from_str(&text)?;
1196 let backend_id = session
1197 .backend_session_id
1198 .as_deref()
1199 .unwrap_or(&session.name);
1200 eprintln!("Resumed session \"{}\"", session.name);
1201 attach_session(backend_id)?;
1202 Ok(format!("Detached from session \"{}\".", session.name))
1203 }
1204 Commands::Schedule { action } => execute_schedule(action, node),
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211
1212 #[test]
1213 fn test_base_url() {
1214 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1215 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1216 }
1217
1218 #[test]
1219 fn test_cli_parse_list() {
1220 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1221 assert_eq!(cli.node, "localhost:7433");
1222 assert!(matches!(cli.command, Some(Commands::List)));
1223 }
1224
1225 #[test]
1226 fn test_cli_parse_nodes() {
1227 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1228 assert!(matches!(cli.command, Some(Commands::Nodes)));
1229 }
1230
1231 #[test]
1232 fn test_cli_parse_ui() {
1233 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1234 assert!(matches!(cli.command, Some(Commands::Ui)));
1235 }
1236
1237 #[test]
1238 fn test_cli_parse_ui_custom_node() {
1239 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1240 assert_eq!(cli.node, "mac-mini:7433");
1242 assert_eq!(cli.path.as_deref(), Some("ui"));
1243 }
1244
1245 #[test]
1246 fn test_build_open_command() {
1247 let cmd = build_open_command("http://localhost:7433");
1248 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1249 assert_eq!(args, vec!["http://localhost:7433"]);
1250 #[cfg(target_os = "macos")]
1251 assert_eq!(cmd.get_program(), "open");
1252 #[cfg(target_os = "linux")]
1253 assert_eq!(cmd.get_program(), "xdg-open");
1254 }
1255
1256 #[test]
1257 fn test_cli_parse_spawn() {
1258 let cli = Cli::try_parse_from([
1259 "pulpo",
1260 "spawn",
1261 "my-task",
1262 "--workdir",
1263 "/tmp/repo",
1264 "--",
1265 "claude",
1266 "-p",
1267 "Fix the bug",
1268 ])
1269 .unwrap();
1270 assert!(matches!(
1271 &cli.command,
1272 Some(Commands::Spawn { name, workdir, command, .. })
1273 if name.as_deref() == Some("my-task") && workdir.as_deref() == Some("/tmp/repo")
1274 && command == &["claude", "-p", "Fix the bug"]
1275 ));
1276 }
1277
1278 #[test]
1279 fn test_cli_parse_spawn_with_ink() {
1280 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--ink", "coder"]).unwrap();
1281 assert!(matches!(
1282 &cli.command,
1283 Some(Commands::Spawn { ink, .. }) if ink.as_deref() == Some("coder")
1284 ));
1285 }
1286
1287 #[test]
1288 fn test_cli_parse_spawn_with_description() {
1289 let cli =
1290 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--description", "Fix the bug"])
1291 .unwrap();
1292 assert!(matches!(
1293 &cli.command,
1294 Some(Commands::Spawn { description, .. }) if description.as_deref() == Some("Fix the bug")
1295 ));
1296 }
1297
1298 #[test]
1299 fn test_cli_parse_spawn_name_positional() {
1300 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "--", "echo", "hello"]).unwrap();
1301 assert!(matches!(
1302 &cli.command,
1303 Some(Commands::Spawn { name, command, .. })
1304 if name.as_deref() == Some("portal") && command == &["echo", "hello"]
1305 ));
1306 }
1307
1308 #[test]
1309 fn test_cli_parse_spawn_no_command() {
1310 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1311 assert!(matches!(
1312 &cli.command,
1313 Some(Commands::Spawn { command, .. }) if command.is_empty()
1314 ));
1315 }
1316
1317 #[test]
1318 fn test_cli_parse_spawn_idle_threshold() {
1319 let cli =
1320 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "0"]).unwrap();
1321 assert!(matches!(
1322 &cli.command,
1323 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(0)
1324 ));
1325 }
1326
1327 #[test]
1328 fn test_cli_parse_spawn_idle_threshold_60() {
1329 let cli =
1330 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--idle-threshold", "60"]).unwrap();
1331 assert!(matches!(
1332 &cli.command,
1333 Some(Commands::Spawn { idle_threshold, .. }) if *idle_threshold == Some(60)
1334 ));
1335 }
1336
1337 #[test]
1338 fn test_cli_parse_spawn_detach() {
1339 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach"]).unwrap();
1340 assert!(matches!(
1341 &cli.command,
1342 Some(Commands::Spawn { detach, .. }) if *detach
1343 ));
1344 }
1345
1346 #[test]
1347 fn test_cli_parse_spawn_detach_short() {
1348 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d"]).unwrap();
1349 assert!(matches!(
1350 &cli.command,
1351 Some(Commands::Spawn { detach, .. }) if *detach
1352 ));
1353 }
1354
1355 #[test]
1356 fn test_cli_parse_spawn_detach_default() {
1357 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
1358 assert!(matches!(
1359 &cli.command,
1360 Some(Commands::Spawn { detach, .. }) if !detach
1361 ));
1362 }
1363
1364 #[test]
1365 fn test_cli_parse_logs() {
1366 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1367 assert!(matches!(
1368 &cli.command,
1369 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 100 && !follow
1370 ));
1371 }
1372
1373 #[test]
1374 fn test_cli_parse_logs_with_lines() {
1375 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1376 assert!(matches!(
1377 &cli.command,
1378 Some(Commands::Logs { name, lines, follow }) if name == "my-session" && *lines == 50 && !follow
1379 ));
1380 }
1381
1382 #[test]
1383 fn test_cli_parse_logs_follow() {
1384 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1385 assert!(matches!(
1386 &cli.command,
1387 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1388 ));
1389 }
1390
1391 #[test]
1392 fn test_cli_parse_logs_follow_short() {
1393 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1394 assert!(matches!(
1395 &cli.command,
1396 Some(Commands::Logs { name, follow, .. }) if name == "my-session" && *follow
1397 ));
1398 }
1399
1400 #[test]
1401 fn test_cli_parse_kill() {
1402 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1403 assert!(matches!(
1404 &cli.command,
1405 Some(Commands::Kill { name }) if name == "my-session"
1406 ));
1407 }
1408
1409 #[test]
1410 fn test_cli_parse_delete() {
1411 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1412 assert!(matches!(
1413 &cli.command,
1414 Some(Commands::Delete { name }) if name == "my-session"
1415 ));
1416 }
1417
1418 #[test]
1419 fn test_cli_parse_resume() {
1420 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1421 assert!(matches!(
1422 &cli.command,
1423 Some(Commands::Resume { name }) if name == "my-session"
1424 ));
1425 }
1426
1427 #[test]
1428 fn test_cli_parse_input() {
1429 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1430 assert!(matches!(
1431 &cli.command,
1432 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("yes")
1433 ));
1434 }
1435
1436 #[test]
1437 fn test_cli_parse_input_no_text() {
1438 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1439 assert!(matches!(
1440 &cli.command,
1441 Some(Commands::Input { name, text }) if name == "my-session" && text.is_none()
1442 ));
1443 }
1444
1445 #[test]
1446 fn test_cli_parse_input_alias() {
1447 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1448 assert!(matches!(
1449 &cli.command,
1450 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
1451 ));
1452 }
1453
1454 #[test]
1455 fn test_cli_parse_custom_node() {
1456 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1457 assert_eq!(cli.node, "win-pc:8080");
1458 assert_eq!(cli.path.as_deref(), Some("list"));
1460 }
1461
1462 #[test]
1463 fn test_cli_version() {
1464 let result = Cli::try_parse_from(["pulpo", "--version"]);
1465 let err = result.unwrap_err();
1467 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1468 }
1469
1470 #[test]
1471 fn test_cli_parse_no_subcommand_succeeds() {
1472 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
1473 assert!(cli.command.is_none());
1474 assert!(cli.path.is_none());
1475 }
1476
1477 #[test]
1478 fn test_cli_debug() {
1479 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1480 let debug = format!("{cli:?}");
1481 assert!(debug.contains("List"));
1482 }
1483
1484 #[test]
1485 fn test_commands_debug() {
1486 let cmd = Commands::List;
1487 assert_eq!(format!("{cmd:?}"), "List");
1488 }
1489
1490 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","command":"claude -p 'Fix bug'","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1492
1493 fn test_create_response_json() -> String {
1495 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1496 }
1497
1498 async fn start_test_server() -> String {
1500 use axum::http::StatusCode;
1501 use axum::{
1502 Json, Router,
1503 routing::{get, post},
1504 };
1505
1506 let create_json = test_create_response_json();
1507
1508 let app = Router::new()
1509 .route(
1510 "/api/v1/sessions",
1511 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1512 (StatusCode::CREATED, create_json.clone())
1513 }),
1514 )
1515 .route(
1516 "/api/v1/sessions/{id}",
1517 get(|| async { TEST_SESSION_JSON.to_owned() })
1518 .delete(|| async { StatusCode::NO_CONTENT }),
1519 )
1520 .route(
1521 "/api/v1/sessions/{id}/kill",
1522 post(|| async { StatusCode::NO_CONTENT }),
1523 )
1524 .route(
1525 "/api/v1/sessions/{id}/output",
1526 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1527 )
1528 .route(
1529 "/api/v1/peers",
1530 get(|| async {
1531 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1532 }),
1533 )
1534 .route(
1535 "/api/v1/sessions/{id}/resume",
1536 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1537 )
1538 .route(
1539 "/api/v1/sessions/{id}/interventions",
1540 get(|| async { "[]".to_owned() }),
1541 )
1542 .route(
1543 "/api/v1/sessions/{id}/input",
1544 post(|| async { StatusCode::NO_CONTENT }),
1545 );
1546
1547 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1548 let addr = listener.local_addr().unwrap();
1549 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1550 format!("127.0.0.1:{}", addr.port())
1551 }
1552
1553 #[tokio::test]
1554 async fn test_execute_list_success() {
1555 let node = start_test_server().await;
1556 let cli = Cli {
1557 node,
1558 token: None,
1559 command: Some(Commands::List),
1560 path: None,
1561 };
1562 let result = execute(&cli).await.unwrap();
1563 assert_eq!(result, "No sessions.");
1564 }
1565
1566 #[tokio::test]
1567 async fn test_execute_nodes_success() {
1568 let node = start_test_server().await;
1569 let cli = Cli {
1570 node,
1571 token: None,
1572 command: Some(Commands::Nodes),
1573 path: None,
1574 };
1575 let result = execute(&cli).await.unwrap();
1576 assert!(result.contains("test"));
1577 assert!(result.contains("(local)"));
1578 assert!(result.contains("NAME"));
1579 }
1580
1581 #[tokio::test]
1582 async fn test_execute_spawn_success() {
1583 let node = start_test_server().await;
1584 let cli = Cli {
1585 node,
1586 token: None,
1587 command: Some(Commands::Spawn {
1588 name: Some("test".into()),
1589 workdir: Some("/tmp/repo".into()),
1590 ink: None,
1591 description: None,
1592 detach: true,
1593 idle_threshold: None,
1594 auto: false,
1595 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1596 }),
1597 path: None,
1598 };
1599 let result = execute(&cli).await.unwrap();
1600 assert!(result.contains("Created session"));
1601 assert!(result.contains("repo"));
1602 }
1603
1604 #[tokio::test]
1605 async fn test_execute_spawn_with_all_flags() {
1606 let node = start_test_server().await;
1607 let cli = Cli {
1608 node,
1609 token: None,
1610 command: Some(Commands::Spawn {
1611 name: Some("test".into()),
1612 workdir: Some("/tmp/repo".into()),
1613 ink: Some("coder".into()),
1614 description: Some("Fix the bug".into()),
1615 detach: true,
1616 idle_threshold: None,
1617 auto: false,
1618 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1619 }),
1620 path: None,
1621 };
1622 let result = execute(&cli).await.unwrap();
1623 assert!(result.contains("Created session"));
1624 }
1625
1626 #[tokio::test]
1627 async fn test_execute_spawn_no_command() {
1628 let node = start_test_server().await;
1629 let cli = Cli {
1630 node,
1631 token: None,
1632 command: Some(Commands::Spawn {
1633 name: Some("test".into()),
1634 workdir: Some("/tmp/repo".into()),
1635 ink: None,
1636 description: None,
1637 detach: true,
1638 idle_threshold: None,
1639 auto: false,
1640 command: vec![],
1641 }),
1642 path: None,
1643 };
1644 let result = execute(&cli).await.unwrap();
1645 assert!(result.contains("Created session"));
1646 }
1647
1648 #[tokio::test]
1649 async fn test_execute_spawn_with_name() {
1650 let node = start_test_server().await;
1651 let cli = Cli {
1652 node,
1653 token: None,
1654 command: Some(Commands::Spawn {
1655 name: Some("my-task".into()),
1656 workdir: Some("/tmp/repo".into()),
1657 ink: None,
1658 description: None,
1659 detach: true,
1660 idle_threshold: None,
1661 auto: false,
1662 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1663 }),
1664 path: None,
1665 };
1666 let result = execute(&cli).await.unwrap();
1667 assert!(result.contains("Created session"));
1668 }
1669
1670 #[tokio::test]
1671 async fn test_execute_spawn_auto_attach() {
1672 let node = start_test_server().await;
1673 let cli = Cli {
1674 node,
1675 token: None,
1676 command: Some(Commands::Spawn {
1677 name: Some("test".into()),
1678 workdir: Some("/tmp/repo".into()),
1679 ink: None,
1680 description: None,
1681 detach: false,
1682 idle_threshold: None,
1683 auto: false,
1684 command: vec!["claude".into(), "-p".into(), "Fix bug".into()],
1685 }),
1686 path: None,
1687 };
1688 let result = execute(&cli).await.unwrap();
1689 assert!(result.contains("Detached from session"));
1691 }
1692
1693 #[tokio::test]
1694 async fn test_execute_kill_success() {
1695 let node = start_test_server().await;
1696 let cli = Cli {
1697 node,
1698 token: None,
1699 command: Some(Commands::Kill {
1700 name: "test-session".into(),
1701 }),
1702 path: None,
1703 };
1704 let result = execute(&cli).await.unwrap();
1705 assert!(result.contains("killed"));
1706 }
1707
1708 #[tokio::test]
1709 async fn test_execute_delete_success() {
1710 let node = start_test_server().await;
1711 let cli = Cli {
1712 node,
1713 token: None,
1714 command: Some(Commands::Delete {
1715 name: "test-session".into(),
1716 }),
1717 path: None,
1718 };
1719 let result = execute(&cli).await.unwrap();
1720 assert!(result.contains("deleted"));
1721 }
1722
1723 #[tokio::test]
1724 async fn test_execute_logs_success() {
1725 let node = start_test_server().await;
1726 let cli = Cli {
1727 node,
1728 token: None,
1729 command: Some(Commands::Logs {
1730 name: "test-session".into(),
1731 lines: 50,
1732 follow: false,
1733 }),
1734 path: None,
1735 };
1736 let result = execute(&cli).await.unwrap();
1737 assert!(result.contains("test output"));
1738 }
1739
1740 #[tokio::test]
1741 async fn test_execute_list_connection_refused() {
1742 let cli = Cli {
1743 node: "localhost:1".into(),
1744 token: None,
1745 command: Some(Commands::List),
1746 path: None,
1747 };
1748 let result = execute(&cli).await;
1749 let err = result.unwrap_err().to_string();
1750 assert!(
1751 err.contains("Could not connect to pulpod"),
1752 "Expected friendly error, got: {err}"
1753 );
1754 assert!(err.contains("localhost:1"));
1755 }
1756
1757 #[tokio::test]
1758 async fn test_execute_nodes_connection_refused() {
1759 let cli = Cli {
1760 node: "localhost:1".into(),
1761 token: None,
1762 command: Some(Commands::Nodes),
1763 path: None,
1764 };
1765 let result = execute(&cli).await;
1766 let err = result.unwrap_err().to_string();
1767 assert!(err.contains("Could not connect to pulpod"));
1768 }
1769
1770 #[tokio::test]
1771 async fn test_execute_kill_error_response() {
1772 use axum::{Router, http::StatusCode, routing::post};
1773
1774 let app = Router::new().route(
1775 "/api/v1/sessions/{id}/kill",
1776 post(|| async {
1777 (
1778 StatusCode::NOT_FOUND,
1779 "{\"error\":\"session not found: test-session\"}",
1780 )
1781 }),
1782 );
1783 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1784 let addr = listener.local_addr().unwrap();
1785 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1786 let node = format!("127.0.0.1:{}", addr.port());
1787
1788 let cli = Cli {
1789 node,
1790 token: None,
1791 command: Some(Commands::Kill {
1792 name: "test-session".into(),
1793 }),
1794 path: None,
1795 };
1796 let err = execute(&cli).await.unwrap_err();
1797 assert_eq!(err.to_string(), "session not found: test-session");
1798 }
1799
1800 #[tokio::test]
1801 async fn test_execute_delete_error_response() {
1802 use axum::{Router, http::StatusCode, routing::delete};
1803
1804 let app = Router::new().route(
1805 "/api/v1/sessions/{id}",
1806 delete(|| async {
1807 (
1808 StatusCode::CONFLICT,
1809 "{\"error\":\"cannot delete session in 'running' state\"}",
1810 )
1811 }),
1812 );
1813 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1814 let addr = listener.local_addr().unwrap();
1815 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1816 let node = format!("127.0.0.1:{}", addr.port());
1817
1818 let cli = Cli {
1819 node,
1820 token: None,
1821 command: Some(Commands::Delete {
1822 name: "test-session".into(),
1823 }),
1824 path: None,
1825 };
1826 let err = execute(&cli).await.unwrap_err();
1827 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1828 }
1829
1830 #[tokio::test]
1831 async fn test_execute_logs_error_response() {
1832 use axum::{Router, http::StatusCode, routing::get};
1833
1834 let app = Router::new().route(
1835 "/api/v1/sessions/{id}/output",
1836 get(|| async {
1837 (
1838 StatusCode::NOT_FOUND,
1839 "{\"error\":\"session not found: ghost\"}",
1840 )
1841 }),
1842 );
1843 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1844 let addr = listener.local_addr().unwrap();
1845 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1846 let node = format!("127.0.0.1:{}", addr.port());
1847
1848 let cli = Cli {
1849 node,
1850 token: None,
1851 command: Some(Commands::Logs {
1852 name: "ghost".into(),
1853 lines: 50,
1854 follow: false,
1855 }),
1856 path: None,
1857 };
1858 let err = execute(&cli).await.unwrap_err();
1859 assert_eq!(err.to_string(), "session not found: ghost");
1860 }
1861
1862 #[tokio::test]
1863 async fn test_execute_resume_error_response() {
1864 use axum::{Router, http::StatusCode, routing::post};
1865
1866 let app = Router::new().route(
1867 "/api/v1/sessions/{id}/resume",
1868 post(|| async {
1869 (
1870 StatusCode::BAD_REQUEST,
1871 "{\"error\":\"session is not lost (status: active)\"}",
1872 )
1873 }),
1874 );
1875 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1876 let addr = listener.local_addr().unwrap();
1877 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1878 let node = format!("127.0.0.1:{}", addr.port());
1879
1880 let cli = Cli {
1881 node,
1882 token: None,
1883 command: Some(Commands::Resume {
1884 name: "test-session".into(),
1885 }),
1886 path: None,
1887 };
1888 let err = execute(&cli).await.unwrap_err();
1889 assert_eq!(err.to_string(), "session is not lost (status: active)");
1890 }
1891
1892 #[tokio::test]
1893 async fn test_execute_spawn_error_response() {
1894 use axum::{Router, http::StatusCode, routing::post};
1895
1896 let app = Router::new().route(
1897 "/api/v1/sessions",
1898 post(|| async {
1899 (
1900 StatusCode::INTERNAL_SERVER_ERROR,
1901 "{\"error\":\"failed to spawn session\"}",
1902 )
1903 }),
1904 );
1905 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1906 let addr = listener.local_addr().unwrap();
1907 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1908 let node = format!("127.0.0.1:{}", addr.port());
1909
1910 let cli = Cli {
1911 node,
1912 token: None,
1913 command: Some(Commands::Spawn {
1914 name: Some("test".into()),
1915 workdir: Some("/tmp/repo".into()),
1916 ink: None,
1917 description: None,
1918 detach: true,
1919 idle_threshold: None,
1920 auto: false,
1921 command: vec!["test".into()],
1922 }),
1923 path: None,
1924 };
1925 let err = execute(&cli).await.unwrap_err();
1926 assert_eq!(err.to_string(), "failed to spawn session");
1927 }
1928
1929 #[tokio::test]
1930 async fn test_execute_interventions_error_response() {
1931 use axum::{Router, http::StatusCode, routing::get};
1932
1933 let app = Router::new().route(
1934 "/api/v1/sessions/{id}/interventions",
1935 get(|| async {
1936 (
1937 StatusCode::NOT_FOUND,
1938 "{\"error\":\"session not found: ghost\"}",
1939 )
1940 }),
1941 );
1942 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1943 let addr = listener.local_addr().unwrap();
1944 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1945 let node = format!("127.0.0.1:{}", addr.port());
1946
1947 let cli = Cli {
1948 node,
1949 token: None,
1950 command: Some(Commands::Interventions {
1951 name: "ghost".into(),
1952 }),
1953 path: None,
1954 };
1955 let err = execute(&cli).await.unwrap_err();
1956 assert_eq!(err.to_string(), "session not found: ghost");
1957 }
1958
1959 #[tokio::test]
1960 async fn test_execute_resume_success() {
1961 let node = start_test_server().await;
1962 let cli = Cli {
1963 node,
1964 token: None,
1965 command: Some(Commands::Resume {
1966 name: "test-session".into(),
1967 }),
1968 path: None,
1969 };
1970 let result = execute(&cli).await.unwrap();
1971 assert!(result.contains("Detached from session"));
1972 }
1973
1974 #[tokio::test]
1975 async fn test_execute_input_success() {
1976 let node = start_test_server().await;
1977 let cli = Cli {
1978 node,
1979 token: None,
1980 command: Some(Commands::Input {
1981 name: "test-session".into(),
1982 text: Some("yes".into()),
1983 }),
1984 path: None,
1985 };
1986 let result = execute(&cli).await.unwrap();
1987 assert!(result.contains("Sent input to session test-session"));
1988 }
1989
1990 #[tokio::test]
1991 async fn test_execute_input_no_text() {
1992 let node = start_test_server().await;
1993 let cli = Cli {
1994 node,
1995 token: None,
1996 command: Some(Commands::Input {
1997 name: "test-session".into(),
1998 text: None,
1999 }),
2000 path: None,
2001 };
2002 let result = execute(&cli).await.unwrap();
2003 assert!(result.contains("Sent input to session test-session"));
2004 }
2005
2006 #[tokio::test]
2007 async fn test_execute_input_connection_refused() {
2008 let cli = Cli {
2009 node: "localhost:1".into(),
2010 token: None,
2011 command: Some(Commands::Input {
2012 name: "test".into(),
2013 text: Some("y".into()),
2014 }),
2015 path: None,
2016 };
2017 let result = execute(&cli).await;
2018 let err = result.unwrap_err().to_string();
2019 assert!(err.contains("Could not connect to pulpod"));
2020 }
2021
2022 #[tokio::test]
2023 async fn test_execute_input_error_response() {
2024 use axum::{Router, http::StatusCode, routing::post};
2025
2026 let app = Router::new().route(
2027 "/api/v1/sessions/{id}/input",
2028 post(|| async {
2029 (
2030 StatusCode::NOT_FOUND,
2031 "{\"error\":\"session not found: ghost\"}",
2032 )
2033 }),
2034 );
2035 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2036 let addr = listener.local_addr().unwrap();
2037 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2038 let node = format!("127.0.0.1:{}", addr.port());
2039
2040 let cli = Cli {
2041 node,
2042 token: None,
2043 command: Some(Commands::Input {
2044 name: "ghost".into(),
2045 text: Some("y".into()),
2046 }),
2047 path: None,
2048 };
2049 let err = execute(&cli).await.unwrap_err();
2050 assert_eq!(err.to_string(), "session not found: ghost");
2051 }
2052
2053 #[tokio::test]
2054 async fn test_execute_ui() {
2055 let cli = Cli {
2056 node: "localhost:7433".into(),
2057 token: None,
2058 command: Some(Commands::Ui),
2059 path: None,
2060 };
2061 let result = execute(&cli).await.unwrap();
2062 assert!(result.contains("Opening"));
2063 assert!(result.contains("http://localhost:7433"));
2064 }
2065
2066 #[tokio::test]
2067 async fn test_execute_ui_custom_node() {
2068 let cli = Cli {
2069 node: "mac-mini:7433".into(),
2070 token: None,
2071 command: Some(Commands::Ui),
2072 path: None,
2073 };
2074 let result = execute(&cli).await.unwrap();
2075 assert!(result.contains("http://mac-mini:7433"));
2076 }
2077
2078 #[test]
2079 fn test_format_sessions_empty() {
2080 assert_eq!(format_sessions(&[]), "No sessions.");
2081 }
2082
2083 #[test]
2084 fn test_format_sessions_with_data() {
2085 use chrono::Utc;
2086 use pulpo_common::session::SessionStatus;
2087 use uuid::Uuid;
2088
2089 let sessions = vec![Session {
2090 id: Uuid::nil(),
2091 name: "my-api".into(),
2092 workdir: "/tmp/repo".into(),
2093 command: "claude -p 'Fix the bug'".into(),
2094 description: Some("Fix the bug".into()),
2095 status: SessionStatus::Active,
2096 exit_code: None,
2097 backend_session_id: None,
2098 output_snapshot: None,
2099 metadata: None,
2100 ink: None,
2101 intervention_code: None,
2102 intervention_reason: None,
2103 intervention_at: None,
2104 last_output_at: None,
2105 idle_since: None,
2106 idle_threshold_secs: None,
2107 created_at: Utc::now(),
2108 updated_at: Utc::now(),
2109 }];
2110 let output = format_sessions(&sessions);
2111 assert!(output.contains("NAME"));
2112 assert!(output.contains("COMMAND"));
2113 assert!(output.contains("my-api"));
2114 assert!(output.contains("active"));
2115 assert!(output.contains("claude -p 'Fix the bug'"));
2116 }
2117
2118 #[test]
2119 fn test_format_sessions_long_command_truncated() {
2120 use chrono::Utc;
2121 use pulpo_common::session::SessionStatus;
2122 use uuid::Uuid;
2123
2124 let sessions = vec![Session {
2125 id: Uuid::nil(),
2126 name: "test".into(),
2127 workdir: "/tmp".into(),
2128 command:
2129 "claude -p 'A very long command that exceeds fifty characters in total length here'"
2130 .into(),
2131 description: None,
2132 status: SessionStatus::Ready,
2133 exit_code: None,
2134 backend_session_id: None,
2135 output_snapshot: None,
2136 metadata: None,
2137 ink: None,
2138 intervention_code: None,
2139 intervention_reason: None,
2140 intervention_at: None,
2141 last_output_at: None,
2142 idle_since: None,
2143 idle_threshold_secs: None,
2144 created_at: Utc::now(),
2145 updated_at: Utc::now(),
2146 }];
2147 let output = format_sessions(&sessions);
2148 assert!(output.contains("..."));
2149 }
2150
2151 #[test]
2152 fn test_format_nodes() {
2153 use pulpo_common::node::NodeInfo;
2154 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2155
2156 let resp = PeersResponse {
2157 local: NodeInfo {
2158 name: "mac-mini".into(),
2159 hostname: "h".into(),
2160 os: "macos".into(),
2161 arch: "arm64".into(),
2162 cpus: 8,
2163 memory_mb: 16384,
2164 gpu: None,
2165 },
2166 peers: vec![PeerInfo {
2167 name: "win-pc".into(),
2168 address: "win-pc:7433".into(),
2169 status: PeerStatus::Online,
2170 node_info: None,
2171 session_count: Some(3),
2172 source: PeerSource::Configured,
2173 }],
2174 };
2175 let output = format_nodes(&resp);
2176 assert!(output.contains("mac-mini"));
2177 assert!(output.contains("(local)"));
2178 assert!(output.contains("win-pc"));
2179 assert!(output.contains('3'));
2180 }
2181
2182 #[test]
2183 fn test_format_nodes_no_session_count() {
2184 use pulpo_common::node::NodeInfo;
2185 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2186
2187 let resp = PeersResponse {
2188 local: NodeInfo {
2189 name: "local".into(),
2190 hostname: "h".into(),
2191 os: "linux".into(),
2192 arch: "x86_64".into(),
2193 cpus: 4,
2194 memory_mb: 8192,
2195 gpu: None,
2196 },
2197 peers: vec![PeerInfo {
2198 name: "peer".into(),
2199 address: "peer:7433".into(),
2200 status: PeerStatus::Offline,
2201 node_info: None,
2202 session_count: None,
2203 source: PeerSource::Configured,
2204 }],
2205 };
2206 let output = format_nodes(&resp);
2207 assert!(output.contains("offline"));
2208 let lines: Vec<&str> = output.lines().collect();
2210 assert!(lines[2].contains('-'));
2211 }
2212
2213 #[tokio::test]
2214 async fn test_execute_resume_connection_refused() {
2215 let cli = Cli {
2216 node: "localhost:1".into(),
2217 token: None,
2218 command: Some(Commands::Resume {
2219 name: "test".into(),
2220 }),
2221 path: None,
2222 };
2223 let result = execute(&cli).await;
2224 let err = result.unwrap_err().to_string();
2225 assert!(err.contains("Could not connect to pulpod"));
2226 }
2227
2228 #[tokio::test]
2229 async fn test_execute_spawn_connection_refused() {
2230 let cli = Cli {
2231 node: "localhost:1".into(),
2232 token: None,
2233 command: Some(Commands::Spawn {
2234 name: Some("test".into()),
2235 workdir: Some("/tmp".into()),
2236 ink: None,
2237 description: None,
2238 detach: true,
2239 idle_threshold: None,
2240 auto: false,
2241 command: vec!["test".into()],
2242 }),
2243 path: None,
2244 };
2245 let result = execute(&cli).await;
2246 let err = result.unwrap_err().to_string();
2247 assert!(err.contains("Could not connect to pulpod"));
2248 }
2249
2250 #[tokio::test]
2251 async fn test_execute_kill_connection_refused() {
2252 let cli = Cli {
2253 node: "localhost:1".into(),
2254 token: None,
2255 command: Some(Commands::Kill {
2256 name: "test".into(),
2257 }),
2258 path: None,
2259 };
2260 let result = execute(&cli).await;
2261 let err = result.unwrap_err().to_string();
2262 assert!(err.contains("Could not connect to pulpod"));
2263 }
2264
2265 #[tokio::test]
2266 async fn test_execute_delete_connection_refused() {
2267 let cli = Cli {
2268 node: "localhost:1".into(),
2269 token: None,
2270 command: Some(Commands::Delete {
2271 name: "test".into(),
2272 }),
2273 path: None,
2274 };
2275 let result = execute(&cli).await;
2276 let err = result.unwrap_err().to_string();
2277 assert!(err.contains("Could not connect to pulpod"));
2278 }
2279
2280 #[tokio::test]
2281 async fn test_execute_logs_connection_refused() {
2282 let cli = Cli {
2283 node: "localhost:1".into(),
2284 token: None,
2285 command: Some(Commands::Logs {
2286 name: "test".into(),
2287 lines: 50,
2288 follow: false,
2289 }),
2290 path: None,
2291 };
2292 let result = execute(&cli).await;
2293 let err = result.unwrap_err().to_string();
2294 assert!(err.contains("Could not connect to pulpod"));
2295 }
2296
2297 #[tokio::test]
2298 async fn test_friendly_error_connect() {
2299 let err = reqwest::Client::new()
2301 .get("http://127.0.0.1:1")
2302 .send()
2303 .await
2304 .unwrap_err();
2305 let friendly = friendly_error(&err, "test-node:1");
2306 let msg = friendly.to_string();
2307 assert!(
2308 msg.contains("Could not connect"),
2309 "Expected connect message, got: {msg}"
2310 );
2311 }
2312
2313 #[tokio::test]
2314 async fn test_friendly_error_other() {
2315 let err = reqwest::Client::new()
2317 .get("http://[::invalid::url")
2318 .send()
2319 .await
2320 .unwrap_err();
2321 let friendly = friendly_error(&err, "bad-host");
2322 let msg = friendly.to_string();
2323 assert!(
2324 msg.contains("Network error"),
2325 "Expected network error message, got: {msg}"
2326 );
2327 assert!(msg.contains("bad-host"));
2328 }
2329
2330 #[test]
2333 fn test_is_localhost_variants() {
2334 assert!(is_localhost("localhost:7433"));
2335 assert!(is_localhost("127.0.0.1:7433"));
2336 assert!(is_localhost("[::1]:7433"));
2337 assert!(is_localhost("::1"));
2338 assert!(is_localhost("localhost"));
2339 assert!(!is_localhost("mac-mini:7433"));
2340 assert!(!is_localhost("192.168.1.100:7433"));
2341 }
2342
2343 #[test]
2344 fn test_authed_get_with_token() {
2345 let client = reqwest::Client::new();
2346 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2347 .build()
2348 .unwrap();
2349 let auth = req
2350 .headers()
2351 .get("authorization")
2352 .unwrap()
2353 .to_str()
2354 .unwrap();
2355 assert_eq!(auth, "Bearer tok");
2356 }
2357
2358 #[test]
2359 fn test_authed_get_without_token() {
2360 let client = reqwest::Client::new();
2361 let req = authed_get(&client, "http://h:1/api".into(), None)
2362 .build()
2363 .unwrap();
2364 assert!(req.headers().get("authorization").is_none());
2365 }
2366
2367 #[test]
2368 fn test_authed_post_with_token() {
2369 let client = reqwest::Client::new();
2370 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2371 .build()
2372 .unwrap();
2373 let auth = req
2374 .headers()
2375 .get("authorization")
2376 .unwrap()
2377 .to_str()
2378 .unwrap();
2379 assert_eq!(auth, "Bearer secret");
2380 }
2381
2382 #[test]
2383 fn test_authed_post_without_token() {
2384 let client = reqwest::Client::new();
2385 let req = authed_post(&client, "http://h:1/api".into(), None)
2386 .build()
2387 .unwrap();
2388 assert!(req.headers().get("authorization").is_none());
2389 }
2390
2391 #[test]
2392 fn test_authed_delete_with_token() {
2393 let client = reqwest::Client::new();
2394 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2395 .build()
2396 .unwrap();
2397 let auth = req
2398 .headers()
2399 .get("authorization")
2400 .unwrap()
2401 .to_str()
2402 .unwrap();
2403 assert_eq!(auth, "Bearer del-tok");
2404 }
2405
2406 #[test]
2407 fn test_authed_delete_without_token() {
2408 let client = reqwest::Client::new();
2409 let req = authed_delete(&client, "http://h:1/api".into(), None)
2410 .build()
2411 .unwrap();
2412 assert!(req.headers().get("authorization").is_none());
2413 }
2414
2415 #[tokio::test]
2416 async fn test_resolve_token_explicit() {
2417 let client = reqwest::Client::new();
2418 let token =
2419 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2420 assert_eq!(token, Some("my-tok".into()));
2421 }
2422
2423 #[tokio::test]
2424 async fn test_resolve_token_remote_no_explicit() {
2425 let client = reqwest::Client::new();
2426 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2427 assert_eq!(token, None);
2428 }
2429
2430 #[tokio::test]
2431 async fn test_resolve_token_localhost_auto_discover() {
2432 use axum::{Json, Router, routing::get};
2433
2434 let app = Router::new().route(
2435 "/api/v1/auth/token",
2436 get(|| async {
2437 Json(AuthTokenResponse {
2438 token: "discovered".into(),
2439 })
2440 }),
2441 );
2442 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2443 let addr = listener.local_addr().unwrap();
2444 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2445
2446 let node = format!("localhost:{}", addr.port());
2447 let base = base_url(&node);
2448 let client = reqwest::Client::new();
2449 let token = resolve_token(&client, &base, &node, None).await;
2450 assert_eq!(token, Some("discovered".into()));
2451 }
2452
2453 #[tokio::test]
2454 async fn test_discover_token_empty_returns_none() {
2455 use axum::{Json, Router, routing::get};
2456
2457 let app = Router::new().route(
2458 "/api/v1/auth/token",
2459 get(|| async {
2460 Json(AuthTokenResponse {
2461 token: String::new(),
2462 })
2463 }),
2464 );
2465 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2466 let addr = listener.local_addr().unwrap();
2467 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2468
2469 let base = format!("http://127.0.0.1:{}", addr.port());
2470 let client = reqwest::Client::new();
2471 assert_eq!(discover_token(&client, &base).await, None);
2472 }
2473
2474 #[tokio::test]
2475 async fn test_discover_token_unreachable_returns_none() {
2476 let client = reqwest::Client::new();
2477 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2478 }
2479
2480 #[test]
2481 fn test_cli_parse_with_token() {
2482 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2483 assert_eq!(cli.token, Some("my-secret".into()));
2484 }
2485
2486 #[test]
2487 fn test_cli_parse_without_token() {
2488 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2489 assert_eq!(cli.token, None);
2490 }
2491
2492 #[tokio::test]
2493 async fn test_execute_with_explicit_token_sends_header() {
2494 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2495
2496 let app = Router::new().route(
2497 "/api/v1/sessions",
2498 get(|req: Request| async move {
2499 let auth = req
2500 .headers()
2501 .get("authorization")
2502 .and_then(|v| v.to_str().ok())
2503 .unwrap_or("");
2504 assert_eq!(auth, "Bearer test-token");
2505 (StatusCode::OK, "[]".to_owned())
2506 }),
2507 );
2508 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2509 let addr = listener.local_addr().unwrap();
2510 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2511 let node = format!("127.0.0.1:{}", addr.port());
2512
2513 let cli = Cli {
2514 node,
2515 token: Some("test-token".into()),
2516 command: Some(Commands::List),
2517 path: None,
2518 };
2519 let result = execute(&cli).await.unwrap();
2520 assert_eq!(result, "No sessions.");
2521 }
2522
2523 #[test]
2526 fn test_cli_parse_interventions() {
2527 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2528 assert!(matches!(
2529 &cli.command,
2530 Some(Commands::Interventions { name }) if name == "my-session"
2531 ));
2532 }
2533
2534 #[test]
2535 fn test_format_interventions_empty() {
2536 assert_eq!(format_interventions(&[]), "No intervention events.");
2537 }
2538
2539 #[test]
2540 fn test_format_interventions_with_data() {
2541 let events = vec![
2542 InterventionEventResponse {
2543 id: 1,
2544 session_id: "sess-1".into(),
2545 code: None,
2546 reason: "Memory exceeded threshold".into(),
2547 created_at: "2026-01-01T00:00:00Z".into(),
2548 },
2549 InterventionEventResponse {
2550 id: 2,
2551 session_id: "sess-1".into(),
2552 code: None,
2553 reason: "Idle for 10 minutes".into(),
2554 created_at: "2026-01-02T00:00:00Z".into(),
2555 },
2556 ];
2557 let output = format_interventions(&events);
2558 assert!(output.contains("ID"));
2559 assert!(output.contains("TIMESTAMP"));
2560 assert!(output.contains("REASON"));
2561 assert!(output.contains("Memory exceeded threshold"));
2562 assert!(output.contains("Idle for 10 minutes"));
2563 assert!(output.contains("2026-01-01T00:00:00Z"));
2564 }
2565
2566 #[tokio::test]
2567 async fn test_execute_interventions_empty() {
2568 let node = start_test_server().await;
2569 let cli = Cli {
2570 node,
2571 token: None,
2572 command: Some(Commands::Interventions {
2573 name: "my-session".into(),
2574 }),
2575 path: None,
2576 };
2577 let result = execute(&cli).await.unwrap();
2578 assert_eq!(result, "No intervention events.");
2579 }
2580
2581 #[tokio::test]
2582 async fn test_execute_interventions_with_data() {
2583 use axum::{Router, routing::get};
2584
2585 let app = Router::new().route(
2586 "/api/v1/sessions/{id}/interventions",
2587 get(|| async {
2588 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2589 .to_owned()
2590 }),
2591 );
2592 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2593 let addr = listener.local_addr().unwrap();
2594 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2595 let node = format!("127.0.0.1:{}", addr.port());
2596
2597 let cli = Cli {
2598 node,
2599 token: None,
2600 command: Some(Commands::Interventions {
2601 name: "test".into(),
2602 }),
2603 path: None,
2604 };
2605 let result = execute(&cli).await.unwrap();
2606 assert!(result.contains("OOM"));
2607 assert!(result.contains("2026-01-01T00:00:00Z"));
2608 }
2609
2610 #[tokio::test]
2611 async fn test_execute_interventions_connection_refused() {
2612 let cli = Cli {
2613 node: "localhost:1".into(),
2614 token: None,
2615 command: Some(Commands::Interventions {
2616 name: "test".into(),
2617 }),
2618 path: None,
2619 };
2620 let result = execute(&cli).await;
2621 let err = result.unwrap_err().to_string();
2622 assert!(err.contains("Could not connect to pulpod"));
2623 }
2624
2625 #[test]
2628 fn test_build_attach_command() {
2629 let cmd = build_attach_command("my-session");
2630 assert_eq!(cmd.get_program(), "tmux");
2631 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2632 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2633 }
2634
2635 #[test]
2636 fn test_cli_parse_attach() {
2637 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2638 assert!(matches!(
2639 &cli.command,
2640 Some(Commands::Attach { name }) if name == "my-session"
2641 ));
2642 }
2643
2644 #[test]
2645 fn test_cli_parse_attach_alias() {
2646 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2647 assert!(matches!(
2648 &cli.command,
2649 Some(Commands::Attach { name }) if name == "my-session"
2650 ));
2651 }
2652
2653 #[tokio::test]
2654 async fn test_execute_attach_success() {
2655 let node = start_test_server().await;
2656 let cli = Cli {
2657 node,
2658 token: None,
2659 command: Some(Commands::Attach {
2660 name: "test-session".into(),
2661 }),
2662 path: None,
2663 };
2664 let result = execute(&cli).await.unwrap();
2665 assert!(result.contains("Detached from session test-session"));
2666 }
2667
2668 #[tokio::test]
2669 async fn test_execute_attach_with_backend_session_id() {
2670 use axum::{Router, routing::get};
2671 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2672 let app = Router::new().route(
2673 "/api/v1/sessions/{id}",
2674 get(move || async move { session_json.to_owned() }),
2675 );
2676 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2677 let addr = listener.local_addr().unwrap();
2678 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2679
2680 let cli = Cli {
2681 node: format!("127.0.0.1:{}", addr.port()),
2682 token: None,
2683 command: Some(Commands::Attach {
2684 name: "my-session".into(),
2685 }),
2686 path: None,
2687 };
2688 let result = execute(&cli).await.unwrap();
2689 assert!(result.contains("Detached from session my-session"));
2690 }
2691
2692 #[tokio::test]
2693 async fn test_execute_attach_connection_refused() {
2694 let cli = Cli {
2695 node: "localhost:1".into(),
2696 token: None,
2697 command: Some(Commands::Attach {
2698 name: "test-session".into(),
2699 }),
2700 path: None,
2701 };
2702 let result = execute(&cli).await;
2703 let err = result.unwrap_err().to_string();
2704 assert!(err.contains("Could not connect to pulpod"));
2705 }
2706
2707 #[tokio::test]
2708 async fn test_execute_attach_error_response() {
2709 use axum::{Router, http::StatusCode, routing::get};
2710 let app = Router::new().route(
2711 "/api/v1/sessions/{id}",
2712 get(|| async {
2713 (
2714 StatusCode::NOT_FOUND,
2715 r#"{"error":"session not found"}"#.to_owned(),
2716 )
2717 }),
2718 );
2719 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2720 let addr = listener.local_addr().unwrap();
2721 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2722
2723 let cli = Cli {
2724 node: format!("127.0.0.1:{}", addr.port()),
2725 token: None,
2726 command: Some(Commands::Attach {
2727 name: "nonexistent".into(),
2728 }),
2729 path: None,
2730 };
2731 let result = execute(&cli).await;
2732 let err = result.unwrap_err().to_string();
2733 assert!(err.contains("session not found"));
2734 }
2735
2736 #[tokio::test]
2737 async fn test_execute_attach_stale_session() {
2738 use axum::{Router, routing::get};
2739 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2740 let app = Router::new().route(
2741 "/api/v1/sessions/{id}",
2742 get(move || async move { session_json.to_owned() }),
2743 );
2744 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2745 let addr = listener.local_addr().unwrap();
2746 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2747
2748 let cli = Cli {
2749 node: format!("127.0.0.1:{}", addr.port()),
2750 token: None,
2751 command: Some(Commands::Attach {
2752 name: "stale-sess".into(),
2753 }),
2754 path: None,
2755 };
2756 let result = execute(&cli).await;
2757 let err = result.unwrap_err().to_string();
2758 assert!(err.contains("lost"));
2759 assert!(err.contains("pulpo resume"));
2760 }
2761
2762 #[tokio::test]
2763 async fn test_execute_attach_dead_session() {
2764 use axum::{Router, routing::get};
2765 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2766 let app = Router::new().route(
2767 "/api/v1/sessions/{id}",
2768 get(move || async move { session_json.to_owned() }),
2769 );
2770 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2771 let addr = listener.local_addr().unwrap();
2772 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2773
2774 let cli = Cli {
2775 node: format!("127.0.0.1:{}", addr.port()),
2776 token: None,
2777 command: Some(Commands::Attach {
2778 name: "dead-sess".into(),
2779 }),
2780 path: None,
2781 };
2782 let result = execute(&cli).await;
2783 let err = result.unwrap_err().to_string();
2784 assert!(err.contains("killed"));
2785 assert!(err.contains("cannot attach"));
2786 }
2787
2788 #[test]
2791 fn test_cli_parse_alias_spawn() {
2792 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "--", "echo", "hello"]).unwrap();
2793 assert!(matches!(&cli.command, Some(Commands::Spawn { .. })));
2794 }
2795
2796 #[test]
2797 fn test_cli_parse_alias_list() {
2798 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2799 assert!(matches!(&cli.command, Some(Commands::List)));
2800 }
2801
2802 #[test]
2803 fn test_cli_parse_alias_logs() {
2804 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2805 assert!(matches!(
2806 &cli.command,
2807 Some(Commands::Logs { name, .. }) if name == "my-session"
2808 ));
2809 }
2810
2811 #[test]
2812 fn test_cli_parse_alias_kill() {
2813 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2814 assert!(matches!(
2815 &cli.command,
2816 Some(Commands::Kill { name }) if name == "my-session"
2817 ));
2818 }
2819
2820 #[test]
2821 fn test_cli_parse_alias_delete() {
2822 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2823 assert!(matches!(
2824 &cli.command,
2825 Some(Commands::Delete { name }) if name == "my-session"
2826 ));
2827 }
2828
2829 #[test]
2830 fn test_cli_parse_alias_resume() {
2831 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2832 assert!(matches!(
2833 &cli.command,
2834 Some(Commands::Resume { name }) if name == "my-session"
2835 ));
2836 }
2837
2838 #[test]
2839 fn test_cli_parse_alias_nodes() {
2840 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2841 assert!(matches!(&cli.command, Some(Commands::Nodes)));
2842 }
2843
2844 #[test]
2845 fn test_cli_parse_alias_interventions() {
2846 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2847 assert!(matches!(
2848 &cli.command,
2849 Some(Commands::Interventions { name }) if name == "my-session"
2850 ));
2851 }
2852
2853 #[test]
2854 fn test_api_error_json() {
2855 let err = api_error("{\"error\":\"session not found: foo\"}");
2856 assert_eq!(err.to_string(), "session not found: foo");
2857 }
2858
2859 #[test]
2860 fn test_api_error_plain_text() {
2861 let err = api_error("plain text error");
2862 assert_eq!(err.to_string(), "plain text error");
2863 }
2864
2865 #[test]
2868 fn test_diff_output_empty_prev() {
2869 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2870 }
2871
2872 #[test]
2873 fn test_diff_output_identical() {
2874 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2875 }
2876
2877 #[test]
2878 fn test_diff_output_new_lines_appended() {
2879 let prev = "line1\nline2";
2880 let new = "line1\nline2\nline3\nline4";
2881 assert_eq!(diff_output(prev, new), "line3\nline4");
2882 }
2883
2884 #[test]
2885 fn test_diff_output_scrolled_window() {
2886 let prev = "line1\nline2\nline3";
2888 let new = "line2\nline3\nline4";
2889 assert_eq!(diff_output(prev, new), "line4");
2890 }
2891
2892 #[test]
2893 fn test_diff_output_completely_different() {
2894 let prev = "aaa\nbbb";
2895 let new = "xxx\nyyy";
2896 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2897 }
2898
2899 #[test]
2900 fn test_diff_output_last_line_matches_but_overlap_fails() {
2901 let prev = "aaa\ncommon";
2903 let new = "zzz\ncommon\nnew_line";
2904 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2908 }
2909
2910 #[test]
2911 fn test_diff_output_new_empty() {
2912 assert_eq!(diff_output("line1", ""), "");
2913 }
2914
2915 async fn start_follow_test_server() -> String {
2920 use axum::{Router, extract::Path, extract::Query, routing::get};
2921 use std::sync::Arc;
2922 use std::sync::atomic::{AtomicUsize, Ordering};
2923
2924 let call_count = Arc::new(AtomicUsize::new(0));
2925 let output_count = call_count.clone();
2926
2927 let app = Router::new()
2928 .route(
2929 "/api/v1/sessions/{id}/output",
2930 get(
2931 move |_path: Path<String>,
2932 _query: Query<std::collections::HashMap<String, String>>| {
2933 let count = output_count.clone();
2934 async move {
2935 let n = count.fetch_add(1, Ordering::SeqCst);
2936 let output = match n {
2937 0 => "line1\nline2".to_owned(),
2938 1 => "line1\nline2\nline3".to_owned(),
2939 _ => "line2\nline3\nline4\n[pulpo] Agent exited (session: test). Run: pulpo resume test".to_owned(),
2940 };
2941 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2942 }
2943 },
2944 ),
2945 )
2946 .route(
2947 "/api/v1/sessions/{id}",
2948 get(|_path: Path<String>| async {
2949 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"active","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2950 }),
2951 );
2952
2953 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2954 let addr = listener.local_addr().unwrap();
2955 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2956 format!("http://127.0.0.1:{}", addr.port())
2957 }
2958
2959 #[tokio::test]
2960 async fn test_follow_logs_polls_and_exits_on_agent_exit_marker() {
2961 let base = start_follow_test_server().await;
2962 let client = reqwest::Client::new();
2963 let mut buf = Vec::new();
2964
2965 follow_logs(&client, &base, "test", 100, None, &mut buf)
2966 .await
2967 .unwrap();
2968
2969 let output = String::from_utf8(buf).unwrap();
2970 assert!(output.contains("line1"));
2972 assert!(output.contains("line2"));
2973 assert!(output.contains("line3"));
2974 assert!(output.contains("line4"));
2975 assert!(output.contains("[pulpo] Agent exited"));
2976 }
2977
2978 #[tokio::test]
2979 async fn test_execute_logs_follow_success() {
2980 let base = start_follow_test_server().await;
2981 let node = base.strip_prefix("http://").unwrap().to_owned();
2983
2984 let cli = Cli {
2985 node,
2986 token: None,
2987 command: Some(Commands::Logs {
2988 name: "test".into(),
2989 lines: 100,
2990 follow: true,
2991 }),
2992 path: None,
2993 };
2994 let result = execute(&cli).await.unwrap();
2996 assert_eq!(result, "");
2997 }
2998
2999 #[tokio::test]
3000 async fn test_execute_logs_follow_connection_refused() {
3001 let cli = Cli {
3002 node: "localhost:1".into(),
3003 token: None,
3004 command: Some(Commands::Logs {
3005 name: "test".into(),
3006 lines: 50,
3007 follow: true,
3008 }),
3009 path: None,
3010 };
3011 let result = execute(&cli).await;
3012 let err = result.unwrap_err().to_string();
3013 assert!(
3014 err.contains("Could not connect to pulpod"),
3015 "Expected friendly error, got: {err}"
3016 );
3017 }
3018
3019 #[tokio::test]
3020 async fn test_follow_logs_exits_on_dead() {
3021 use axum::{Router, extract::Path, extract::Query, routing::get};
3022
3023 let app = Router::new()
3024 .route(
3025 "/api/v1/sessions/{id}/output",
3026 get(
3027 |_path: Path<String>,
3028 _query: Query<std::collections::HashMap<String, String>>| async {
3029 r#"{"output":"some output"}"#.to_owned()
3030 },
3031 ),
3032 )
3033 .route(
3034 "/api/v1/sessions/{id}",
3035 get(|_path: Path<String>| async {
3036 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"killed","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3037 }),
3038 );
3039
3040 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3041 let addr = listener.local_addr().unwrap();
3042 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3043 let base = format!("http://127.0.0.1:{}", addr.port());
3044
3045 let client = reqwest::Client::new();
3046 let mut buf = Vec::new();
3047 follow_logs(&client, &base, "test", 100, None, &mut buf)
3048 .await
3049 .unwrap();
3050
3051 let output = String::from_utf8(buf).unwrap();
3052 assert!(output.contains("some output"));
3053 }
3054
3055 #[tokio::test]
3056 async fn test_follow_logs_exits_on_stale() {
3057 use axum::{Router, extract::Path, extract::Query, routing::get};
3058
3059 let app = Router::new()
3060 .route(
3061 "/api/v1/sessions/{id}/output",
3062 get(
3063 |_path: Path<String>,
3064 _query: Query<std::collections::HashMap<String, String>>| async {
3065 r#"{"output":"stale output"}"#.to_owned()
3066 },
3067 ),
3068 )
3069 .route(
3070 "/api/v1/sessions/{id}",
3071 get(|_path: Path<String>| async {
3072 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","command":"echo test","description":null,"status":"lost","exit_code":null,"backend_session_id":null,"output_snapshot":null,"metadata":null,"ink":null,"intervention_code":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"idle_since":null,"idle_threshold_secs":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3073 }),
3074 );
3075
3076 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3077 let addr = listener.local_addr().unwrap();
3078 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3079 let base = format!("http://127.0.0.1:{}", addr.port());
3080
3081 let client = reqwest::Client::new();
3082 let mut buf = Vec::new();
3083 follow_logs(&client, &base, "test", 100, None, &mut buf)
3084 .await
3085 .unwrap();
3086
3087 let output = String::from_utf8(buf).unwrap();
3088 assert!(output.contains("stale output"));
3089 }
3090
3091 #[tokio::test]
3092 async fn test_execute_logs_follow_non_reqwest_error() {
3093 use axum::{Router, extract::Path, extract::Query, routing::get};
3094
3095 let app = Router::new()
3097 .route(
3098 "/api/v1/sessions/{id}/output",
3099 get(
3100 |_path: Path<String>,
3101 _query: Query<std::collections::HashMap<String, String>>| async {
3102 r#"{"output":"initial"}"#.to_owned()
3103 },
3104 ),
3105 )
3106 .route(
3107 "/api/v1/sessions/{id}",
3108 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3109 );
3110
3111 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3112 let addr = listener.local_addr().unwrap();
3113 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3114 let node = format!("127.0.0.1:{}", addr.port());
3115
3116 let cli = Cli {
3117 node,
3118 token: None,
3119 command: Some(Commands::Logs {
3120 name: "test".into(),
3121 lines: 100,
3122 follow: true,
3123 }),
3124 path: None,
3125 };
3126 let err = execute(&cli).await.unwrap_err();
3127 let msg = err.to_string();
3129 assert!(
3130 msg.contains("expected ident"),
3131 "Expected serde parse error, got: {msg}"
3132 );
3133 }
3134
3135 #[tokio::test]
3136 async fn test_fetch_session_status_connection_error() {
3137 let client = reqwest::Client::new();
3138 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3139 assert!(result.is_err());
3140 }
3141
3142 #[test]
3145 fn test_build_crontab_line() {
3146 let line = build_crontab_line(
3147 "nightly-review",
3148 "0 3 * * *",
3149 "/home/me/repo",
3150 "claude -p 'Review PRs'",
3151 "localhost:7433",
3152 );
3153 assert_eq!(
3154 line,
3155 "0 3 * * * pulpo --node localhost:7433 spawn nightly-review --workdir /home/me/repo -- claude -p 'Review PRs' #pulpo:nightly-review\n"
3156 );
3157 }
3158
3159 #[test]
3160 fn test_crontab_install_success() {
3161 let crontab = "# existing cron\n0 * * * * echo hi\n";
3162 let line = "0 3 * * * pulpo --node n spawn my-job --workdir /tmp -- claude -p task #pulpo:my-job\n";
3163 let result = crontab_install(crontab, "my-job", line).unwrap();
3164 assert!(result.starts_with("# existing cron\n"));
3165 assert!(result.ends_with("#pulpo:my-job\n"));
3166 assert!(result.contains("echo hi"));
3167 }
3168
3169 #[test]
3170 fn test_crontab_install_duplicate_error() {
3171 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3172 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3173 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3174 assert!(err.to_string().contains("already exists"));
3175 }
3176
3177 #[test]
3178 fn test_crontab_list_empty() {
3179 assert_eq!(crontab_list(""), "No pulpo schedules.");
3180 }
3181
3182 #[test]
3183 fn test_crontab_list_no_pulpo_entries() {
3184 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3185 }
3186
3187 #[test]
3188 fn test_crontab_list_with_entries() {
3189 let crontab = "0 3 * * * pulpo --node n spawn nightly --workdir /tmp -- claude -p task #pulpo:nightly\n";
3190 let output = crontab_list(crontab);
3191 assert!(output.contains("NAME"));
3192 assert!(output.contains("CRON"));
3193 assert!(output.contains("PAUSED"));
3194 assert!(output.contains("nightly"));
3195 assert!(output.contains("0 3 * * *"));
3196 assert!(output.contains("no"));
3197 }
3198
3199 #[test]
3200 fn test_crontab_list_paused_entry() {
3201 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3202 let output = crontab_list(crontab);
3203 assert!(output.contains("paused-job"));
3204 assert!(output.contains("yes"));
3205 }
3206
3207 #[test]
3208 fn test_crontab_list_short_line() {
3209 let crontab = "badcron #pulpo:broken\n";
3211 let output = crontab_list(crontab);
3212 assert!(output.contains("broken"));
3213 assert!(output.contains('?'));
3214 }
3215
3216 #[test]
3217 fn test_crontab_remove_success() {
3218 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3219 let result = crontab_remove(crontab, "my-job").unwrap();
3220 assert!(result.contains("echo hi"));
3221 assert!(!result.contains("my-job"));
3222 }
3223
3224 #[test]
3225 fn test_crontab_remove_not_found() {
3226 let crontab = "0 * * * * echo hi\n";
3227 let err = crontab_remove(crontab, "ghost").unwrap_err();
3228 assert!(err.to_string().contains("not found"));
3229 }
3230
3231 #[test]
3232 fn test_crontab_pause_success() {
3233 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3234 let result = crontab_pause(crontab, "my-job").unwrap();
3235 assert!(result.starts_with('#'));
3236 assert!(result.contains("#pulpo:my-job"));
3237 }
3238
3239 #[test]
3240 fn test_crontab_pause_not_found() {
3241 let crontab = "0 * * * * echo hi\n";
3242 let err = crontab_pause(crontab, "ghost").unwrap_err();
3243 assert!(err.to_string().contains("not found or already paused"));
3244 }
3245
3246 #[test]
3247 fn test_crontab_pause_already_paused() {
3248 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3249 let err = crontab_pause(crontab, "my-job").unwrap_err();
3250 assert!(err.to_string().contains("already paused"));
3251 }
3252
3253 #[test]
3254 fn test_crontab_resume_success() {
3255 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3256 let result = crontab_resume(crontab, "my-job").unwrap();
3257 assert!(!result.starts_with('#'));
3258 assert!(result.contains("#pulpo:my-job"));
3259 }
3260
3261 #[test]
3262 fn test_crontab_resume_not_found() {
3263 let crontab = "0 * * * * echo hi\n";
3264 let err = crontab_resume(crontab, "ghost").unwrap_err();
3265 assert!(err.to_string().contains("not found or not paused"));
3266 }
3267
3268 #[test]
3269 fn test_crontab_resume_not_paused() {
3270 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3271 let err = crontab_resume(crontab, "my-job").unwrap_err();
3272 assert!(err.to_string().contains("not paused"));
3273 }
3274
3275 #[test]
3278 fn test_cli_parse_schedule_install() {
3279 let cli = Cli::try_parse_from([
3280 "pulpo",
3281 "schedule",
3282 "install",
3283 "nightly",
3284 "0 3 * * *",
3285 "--workdir",
3286 "/tmp/repo",
3287 "--",
3288 "claude",
3289 "-p",
3290 "Review PRs",
3291 ])
3292 .unwrap();
3293 assert!(matches!(
3294 &cli.command,
3295 Some(Commands::Schedule {
3296 action: ScheduleAction::Install { name, cron, workdir, command }
3297 }) if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3298 && command == &["claude", "-p", "Review PRs"]
3299 ));
3300 }
3301
3302 #[test]
3303 fn test_cli_parse_schedule_list() {
3304 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3305 assert!(matches!(
3306 &cli.command,
3307 Some(Commands::Schedule {
3308 action: ScheduleAction::List
3309 })
3310 ));
3311 }
3312
3313 #[test]
3314 fn test_cli_parse_schedule_remove() {
3315 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3316 assert!(matches!(
3317 &cli.command,
3318 Some(Commands::Schedule {
3319 action: ScheduleAction::Remove { name }
3320 }) if name == "nightly"
3321 ));
3322 }
3323
3324 #[test]
3325 fn test_cli_parse_schedule_pause() {
3326 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3327 assert!(matches!(
3328 &cli.command,
3329 Some(Commands::Schedule {
3330 action: ScheduleAction::Pause { name }
3331 }) if name == "nightly"
3332 ));
3333 }
3334
3335 #[test]
3336 fn test_cli_parse_schedule_resume() {
3337 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3338 assert!(matches!(
3339 &cli.command,
3340 Some(Commands::Schedule {
3341 action: ScheduleAction::Resume { name }
3342 }) if name == "nightly"
3343 ));
3344 }
3345
3346 #[test]
3347 fn test_cli_parse_schedule_alias() {
3348 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3349 assert!(matches!(
3350 &cli.command,
3351 Some(Commands::Schedule {
3352 action: ScheduleAction::List
3353 })
3354 ));
3355 }
3356
3357 #[test]
3358 fn test_cli_parse_schedule_list_alias() {
3359 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3360 assert!(matches!(
3361 &cli.command,
3362 Some(Commands::Schedule {
3363 action: ScheduleAction::List
3364 })
3365 ));
3366 }
3367
3368 #[test]
3369 fn test_cli_parse_schedule_remove_alias() {
3370 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3371 assert!(matches!(
3372 &cli.command,
3373 Some(Commands::Schedule {
3374 action: ScheduleAction::Remove { name }
3375 }) if name == "nightly"
3376 ));
3377 }
3378
3379 #[tokio::test]
3380 async fn test_execute_schedule_via_execute() {
3381 let node = start_test_server().await;
3383 let cli = Cli {
3384 node,
3385 token: None,
3386 command: Some(Commands::Schedule {
3387 action: ScheduleAction::List,
3388 }),
3389 path: None,
3390 };
3391 let result = execute(&cli).await;
3392 assert!(result.is_ok() || result.is_err());
3394 }
3395
3396 #[test]
3397 fn test_schedule_action_debug() {
3398 let action = ScheduleAction::List;
3399 assert_eq!(format!("{action:?}"), "List");
3400 }
3401
3402 #[test]
3403 fn test_cli_parse_send_alias() {
3404 let cli = Cli::try_parse_from(["pulpo", "send", "my-session", "y"]).unwrap();
3405 assert!(matches!(
3406 &cli.command,
3407 Some(Commands::Input { name, text }) if name == "my-session" && text.as_deref() == Some("y")
3408 ));
3409 }
3410
3411 #[test]
3412 fn test_cli_parse_spawn_no_name() {
3413 let cli = Cli::try_parse_from(["pulpo", "spawn"]).unwrap();
3414 assert!(matches!(
3415 &cli.command,
3416 Some(Commands::Spawn { name, command, .. }) if name.is_none() && command.is_empty()
3417 ));
3418 }
3419
3420 #[test]
3421 fn test_cli_parse_spawn_optional_name_with_command() {
3422 let cli = Cli::try_parse_from(["pulpo", "spawn", "--", "echo", "hello"]).unwrap();
3423 assert!(matches!(
3424 &cli.command,
3425 Some(Commands::Spawn { name, command, .. })
3426 if name.is_none() && command == &["echo", "hello"]
3427 ));
3428 }
3429
3430 #[test]
3431 fn test_cli_parse_path_shortcut() {
3432 let cli = Cli::try_parse_from(["pulpo", "/tmp/my-repo"]).unwrap();
3433 assert!(cli.command.is_none());
3434 assert_eq!(cli.path.as_deref(), Some("/tmp/my-repo"));
3435 }
3436
3437 #[test]
3438 fn test_cli_parse_no_args() {
3439 let cli = Cli::try_parse_from(["pulpo"]).unwrap();
3440 assert!(cli.command.is_none());
3441 assert!(cli.path.is_none());
3442 }
3443
3444 #[test]
3445 fn test_derive_session_name_simple() {
3446 assert_eq!(derive_session_name("/home/user/my-repo"), "my-repo");
3447 }
3448
3449 #[test]
3450 fn test_derive_session_name_with_special_chars() {
3451 assert_eq!(derive_session_name("/home/user/My Repo_v2"), "my-repo-v2");
3452 }
3453
3454 #[test]
3455 fn test_derive_session_name_root() {
3456 assert_eq!(derive_session_name("/"), "session");
3457 }
3458
3459 #[test]
3460 fn test_derive_session_name_dots() {
3461 assert_eq!(derive_session_name("/home/user/.hidden"), "hidden");
3462 }
3463
3464 #[test]
3465 fn test_resolve_path_absolute() {
3466 assert_eq!(resolve_path("/tmp/repo"), "/tmp/repo");
3467 }
3468
3469 #[test]
3470 fn test_resolve_path_relative() {
3471 let resolved = resolve_path("my-repo");
3472 assert!(resolved.ends_with("my-repo"));
3473 assert!(resolved.starts_with('/'));
3474 }
3475
3476 #[tokio::test]
3477 async fn test_execute_path_shortcut() {
3478 let node = start_test_server().await;
3479 let cli = Cli {
3480 node,
3481 token: None,
3482 path: Some("/tmp".into()),
3483 command: None,
3484 };
3485 let result = execute(&cli).await.unwrap();
3486 assert!(result.contains("Detached from session"));
3487 }
3488
3489 #[test]
3492 fn test_node_needs_resolution() {
3493 assert!(!node_needs_resolution("localhost:7433"));
3494 assert!(!node_needs_resolution("mac-mini:7433"));
3495 assert!(!node_needs_resolution("10.0.0.1:7433"));
3496 assert!(!node_needs_resolution("[::1]:7433"));
3497 assert!(node_needs_resolution("mac-mini"));
3498 assert!(node_needs_resolution("linux-server"));
3499 assert!(node_needs_resolution("localhost"));
3500 }
3501
3502 #[tokio::test]
3503 async fn test_resolve_node_with_port() {
3504 let client = reqwest::Client::new();
3505 let (addr, token) = resolve_node(&client, "mac-mini:7433").await;
3506 assert_eq!(addr, "mac-mini:7433");
3507 assert!(token.is_none());
3508 }
3509
3510 #[tokio::test]
3511 async fn test_resolve_node_fallback_appends_port() {
3512 let client = reqwest::Client::new();
3515 let (addr, token) = resolve_node(&client, "unknown-host").await;
3516 assert_eq!(addr, "unknown-host:7433");
3517 assert!(token.is_none());
3518 }
3519
3520 #[cfg(not(coverage))]
3521 #[tokio::test]
3522 async fn test_resolve_node_finds_peer() {
3523 use axum::{Router, routing::get};
3524
3525 let app = Router::new()
3526 .route(
3527 "/api/v1/peers",
3528 get(|| async {
3529 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"mac-mini","address":"10.0.0.5:7433","status":"online","node_info":null,"session_count":2,"source":"configured"}]}"#.to_owned()
3530 }),
3531 )
3532 .route(
3533 "/api/v1/config",
3534 get(|| async {
3535 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"mac-mini":{"address":"10.0.0.5:7433","token":"peer-secret"}},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3536 }),
3537 );
3538
3539 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3541 return;
3542 };
3543 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3544
3545 let client = reqwest::Client::new();
3546 let (addr, token) = resolve_node(&client, "mac-mini").await;
3547 assert_eq!(addr, "10.0.0.5:7433");
3548 assert_eq!(token, Some("peer-secret".into()));
3549 }
3550
3551 #[cfg(not(coverage))]
3552 #[tokio::test]
3553 async fn test_resolve_node_peer_no_token() {
3554 use axum::{Router, routing::get};
3555
3556 let app = Router::new()
3557 .route(
3558 "/api/v1/peers",
3559 get(|| async {
3560 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[{"name":"test-peer","address":"10.0.0.9:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3561 }),
3562 )
3563 .route(
3564 "/api/v1/config",
3565 get(|| async {
3566 r#"{"node":{"name":"local","port":7433,"data_dir":"/tmp","bind":"local","tag":null,"seed":null,"discovery_interval_secs":30},"auth":{},"peers":{"test-peer":"10.0.0.9:7433"},"watchdog":{"enabled":true,"memory_threshold":90,"check_interval_secs":10,"breach_count":3,"idle_timeout_secs":600,"idle_action":"alert","idle_threshold_secs":60},"notifications":{"discord":null,"webhooks":[]},"inks":{}}"#.to_owned()
3567 }),
3568 );
3569
3570 let Ok(listener) = tokio::net::TcpListener::bind("127.0.0.1:7433").await else {
3571 return; };
3573 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3574
3575 let client = reqwest::Client::new();
3576 let (addr, token) = resolve_node(&client, "test-peer").await;
3577 assert_eq!(addr, "10.0.0.9:7433");
3578 assert!(token.is_none()); }
3580
3581 #[tokio::test]
3582 async fn test_execute_with_peer_name_resolution() {
3583 let cli = Cli {
3587 node: "nonexistent-peer".into(),
3588 token: None,
3589 command: Some(Commands::List),
3590 path: None,
3591 };
3592 let result = execute(&cli).await;
3593 assert!(result.is_err());
3595 }
3596
3597 #[test]
3600 fn test_cli_parse_spawn_auto() {
3601 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto"]).unwrap();
3602 assert!(matches!(
3603 &cli.command,
3604 Some(Commands::Spawn { auto, .. }) if *auto
3605 ));
3606 }
3607
3608 #[test]
3609 fn test_cli_parse_spawn_auto_default() {
3610 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task"]).unwrap();
3611 assert!(matches!(
3612 &cli.command,
3613 Some(Commands::Spawn { auto, .. }) if !auto
3614 ));
3615 }
3616
3617 #[tokio::test]
3618 async fn test_select_best_node_coverage_stub() {
3619 let client = reqwest::Client::new();
3621 let _result = select_best_node(&client, "http://127.0.0.1:19999", None).await;
3624 }
3625
3626 #[cfg(not(coverage))]
3627 #[tokio::test]
3628 async fn test_select_best_node_picks_least_loaded() {
3629 use axum::{Router, routing::get};
3630
3631 let app = Router::new().route(
3632 "/api/v1/peers",
3633 get(|| async {
3634 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"busy","address":"busy:7433","status":"online","node_info":{"name":"busy","hostname":"h","os":"linux","arch":"x86_64","cpus":4,"memory_mb":8192,"gpu":null},"session_count":5,"source":"configured"},{"name":"idle","address":"idle:7433","status":"online","node_info":{"name":"idle","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":16384,"gpu":null},"session_count":1,"source":"configured"}]}"#.to_owned()
3635 }),
3636 );
3637 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3638 let addr = listener.local_addr().unwrap();
3639 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3640 let base = format!("http://127.0.0.1:{}", addr.port());
3641
3642 let client = reqwest::Client::new();
3643 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3644 assert_eq!(name, "idle");
3648 assert_eq!(addr, "idle:7433");
3649 }
3650
3651 #[cfg(not(coverage))]
3652 #[tokio::test]
3653 async fn test_select_best_node_no_online_peers_falls_back_to_local() {
3654 use axum::{Router, routing::get};
3655
3656 let app = Router::new().route(
3657 "/api/v1/peers",
3658 get(|| async {
3659 r#"{"local":{"name":"my-mac","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"offline-peer","address":"offline:7433","status":"offline","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3660 }),
3661 );
3662 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3663 let addr = listener.local_addr().unwrap();
3664 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3665 let base = format!("http://127.0.0.1:{}", addr.port());
3666
3667 let client = reqwest::Client::new();
3668 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3669 assert_eq!(name, "my-mac");
3670 assert_eq!(addr, "localhost:7433");
3671 }
3672
3673 #[cfg(not(coverage))]
3674 #[tokio::test]
3675 async fn test_select_best_node_empty_peers_falls_back_to_local() {
3676 use axum::{Router, routing::get};
3677
3678 let app = Router::new().route(
3679 "/api/v1/peers",
3680 get(|| async {
3681 r#"{"local":{"name":"solo","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[]}"#.to_owned()
3682 }),
3683 );
3684 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3685 let addr = listener.local_addr().unwrap();
3686 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3687 let base = format!("http://127.0.0.1:{}", addr.port());
3688
3689 let client = reqwest::Client::new();
3690 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3691 assert_eq!(name, "solo");
3692 assert_eq!(addr, "localhost:7433");
3693 }
3694
3695 #[cfg(not(coverage))]
3696 #[tokio::test]
3697 async fn test_execute_spawn_auto_selects_node() {
3698 use axum::{
3699 Router,
3700 http::StatusCode,
3701 routing::{get, post},
3702 };
3703
3704 let create_json = test_create_response_json();
3705
3706 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3708 let addr = listener.local_addr().unwrap();
3709 let node = format!("127.0.0.1:{}", addr.port());
3710 let peer_addr = node.clone();
3711
3712 let app = Router::new()
3713 .route(
3714 "/api/v1/peers",
3715 get(move || {
3716 let peer_addr = peer_addr.clone();
3717 async move {
3718 format!(
3719 r#"{{"local":{{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null}},"peers":[{{"name":"remote","address":"{peer_addr}","status":"online","node_info":{{"name":"remote","hostname":"h","os":"linux","arch":"x86_64","cpus":8,"memory_mb":32768,"gpu":null}},"session_count":0,"source":"configured"}}]}}"#
3720 )
3721 }
3722 }),
3723 )
3724 .route(
3725 "/api/v1/sessions",
3726 post(move || async move {
3727 (StatusCode::CREATED, create_json.clone())
3728 }),
3729 );
3730
3731 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3732
3733 let cli = Cli {
3734 node,
3735 token: None,
3736 command: Some(Commands::Spawn {
3737 name: Some("test".into()),
3738 workdir: Some("/tmp/repo".into()),
3739 ink: None,
3740 description: None,
3741 detach: true,
3742 idle_threshold: None,
3743 auto: true,
3744 command: vec!["echo".into(), "hello".into()],
3745 }),
3746 path: None,
3747 };
3748 let result = execute(&cli).await.unwrap();
3749 assert!(result.contains("Created session"));
3750 }
3751
3752 #[cfg(not(coverage))]
3753 #[tokio::test]
3754 async fn test_select_best_node_peer_no_session_count() {
3755 use axum::{Router, routing::get};
3756
3757 let app = Router::new().route(
3758 "/api/v1/peers",
3759 get(|| async {
3760 r#"{"local":{"name":"local","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":16384,"gpu":null},"peers":[{"name":"fresh","address":"fresh:7433","status":"online","node_info":null,"session_count":null,"source":"configured"}]}"#.to_owned()
3761 }),
3762 );
3763 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3764 let addr = listener.local_addr().unwrap();
3765 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3766 let base = format!("http://127.0.0.1:{}", addr.port());
3767
3768 let client = reqwest::Client::new();
3769 let (addr, name) = select_best_node(&client, &base, None).await.unwrap();
3770 assert_eq!(name, "fresh");
3772 assert_eq!(addr, "fresh:7433");
3773 }
3774}