1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, CultureDeleteResponse, CultureItemResponse,
5 CulturePushResponse, CultureResponse, InterventionEventResponse, PeersResponse,
6 ProvidersResponse,
7};
8#[cfg(test)]
9use pulpo_common::api::{ProviderCapabilitiesResponse, ProviderInfoResponse};
10use pulpo_common::culture::Culture;
11use pulpo_common::session::Session;
12
13#[derive(Parser, Debug)]
14#[command(
15 name = "pulpo",
16 about = "Manage agent sessions across your machines",
17 version
18)]
19pub struct Cli {
20 #[arg(long, default_value = "localhost:7433")]
22 pub node: String,
23
24 #[arg(long)]
26 pub token: Option<String>,
27
28 #[command(subcommand)]
29 pub command: Commands,
30}
31
32#[derive(Subcommand, Debug)]
33#[allow(clippy::large_enum_variant)]
34pub enum Commands {
35 #[command(visible_alias = "a")]
37 Attach {
38 name: String,
40 },
41
42 #[command(visible_alias = "i")]
44 Input {
45 name: String,
47 text: Option<String>,
49 },
50
51 #[command(visible_alias = "s")]
53 Spawn {
54 #[arg(long)]
56 workdir: Option<String>,
57
58 #[arg(long)]
60 name: Option<String>,
61
62 #[arg(long)]
64 provider: Option<String>,
65
66 #[arg(long)]
68 auto: bool,
69
70 #[arg(long)]
72 unrestricted: bool,
73
74 #[arg(long)]
76 model: Option<String>,
77
78 #[arg(long)]
80 system_prompt: Option<String>,
81
82 #[arg(long, value_delimiter = ',')]
84 allowed_tools: Option<Vec<String>>,
85
86 #[arg(long)]
88 ink: Option<String>,
89
90 #[arg(long)]
92 max_turns: Option<u32>,
93
94 #[arg(long)]
96 max_budget: Option<f64>,
97
98 #[arg(long)]
100 output_format: Option<String>,
101
102 #[arg(long)]
104 worktree: bool,
105
106 #[arg(long)]
108 conversation_id: Option<String>,
109
110 prompt: Vec<String>,
112 },
113
114 #[command(visible_alias = "ls")]
116 List,
117
118 #[command(visible_alias = "l")]
120 Logs {
121 name: String,
123
124 #[arg(long, default_value = "100")]
126 lines: usize,
127
128 #[arg(short, long)]
130 follow: bool,
131 },
132
133 #[command(visible_alias = "k")]
135 Kill {
136 name: String,
138 },
139
140 #[command(visible_alias = "rm")]
142 Delete {
143 name: String,
145 },
146
147 #[command(visible_alias = "r")]
149 Resume {
150 name: String,
152 },
153
154 #[command(visible_alias = "n")]
156 Nodes,
157
158 #[command(visible_alias = "iv")]
160 Interventions {
161 name: String,
163 },
164
165 #[command(visible_alias = "p")]
167 Providers,
168
169 Ui,
171
172 #[command(visible_alias = "kn")]
174 Culture {
175 #[arg(long)]
177 session: Option<String>,
178
179 #[arg(long)]
181 kind: Option<String>,
182
183 #[arg(long)]
185 repo: Option<String>,
186
187 #[arg(long)]
189 ink: Option<String>,
190
191 #[arg(long, default_value = "20")]
193 limit: usize,
194
195 #[arg(long)]
197 context: bool,
198
199 #[arg(long)]
201 get: Option<String>,
202
203 #[arg(long)]
205 delete: Option<String>,
206
207 #[arg(long)]
209 push: bool,
210 },
211
212 #[command(visible_alias = "sched")]
214 Schedule {
215 #[command(subcommand)]
216 action: ScheduleAction,
217 },
218}
219
220#[derive(Subcommand, Debug)]
221pub enum ScheduleAction {
222 Install {
224 name: String,
226 cron: String,
228 #[arg(long)]
230 workdir: String,
231 #[arg(long, default_value = "claude")]
233 provider: String,
234 prompt: Vec<String>,
236 },
237 #[command(alias = "ls")]
239 List,
240 #[command(alias = "rm")]
242 Remove {
243 name: String,
245 },
246 Pause {
248 name: String,
250 },
251 Resume {
253 name: String,
255 },
256}
257
258pub fn base_url(node: &str) -> String {
260 format!("http://{node}")
261}
262
263#[derive(serde::Deserialize)]
265struct OutputResponse {
266 output: String,
267}
268
269fn format_sessions(sessions: &[Session]) -> String {
271 if sessions.is_empty() {
272 return "No sessions.".into();
273 }
274 let mut lines = vec![format!(
275 "{:<20} {:<12} {:<10} {:<14} {}",
276 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
277 )];
278 for s in sessions {
279 let prompt_display = if s.prompt.len() > 40 {
280 format!("{}...", &s.prompt[..37])
281 } else {
282 s.prompt.clone()
283 };
284 let status_display = s.status.to_string();
285 lines.push(format!(
286 "{:<20} {:<12} {:<10} {:<14} {}",
287 s.name, status_display, s.provider, s.mode, prompt_display
288 ));
289 }
290 lines.join("\n")
291}
292
293fn format_nodes(resp: &PeersResponse) -> String {
295 let mut lines = vec![format!(
296 "{:<20} {:<25} {:<10} {}",
297 "NAME", "ADDRESS", "STATUS", "SESSIONS"
298 )];
299 lines.push(format!(
300 "{:<20} {:<25} {:<10} {}",
301 resp.local.name, "(local)", "online", "-"
302 ));
303 for p in &resp.peers {
304 let sessions = p
305 .session_count
306 .map_or_else(|| "-".into(), |c| c.to_string());
307 lines.push(format!(
308 "{:<20} {:<25} {:<10} {}",
309 p.name, p.address, p.status, sessions
310 ));
311 }
312 lines.join("\n")
313}
314
315fn format_interventions(events: &[InterventionEventResponse]) -> String {
317 if events.is_empty() {
318 return "No intervention events.".into();
319 }
320 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
321 for e in events {
322 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
323 }
324 lines.join("\n")
325}
326
327fn format_culture(items: &[Culture]) -> String {
329 if items.is_empty() {
330 return "No culture found.".into();
331 }
332 let mut lines = vec![format!(
333 "{:<10} {:<40} {:<10} {:<6} {}",
334 "KIND", "TITLE", "REPO", "REL", "TAGS"
335 )];
336 for k in items {
337 let title = if k.title.len() > 38 {
338 format!("{}…", &k.title[..37])
339 } else {
340 k.title.clone()
341 };
342 let repo = k
343 .scope_repo
344 .as_deref()
345 .and_then(|r| r.rsplit('/').next())
346 .unwrap_or("-");
347 let tags = k.tags.join(",");
348 lines.push(format!(
349 "{:<10} {:<40} {:<10} {:<6.2} {}",
350 k.kind, title, repo, k.relevance, tags
351 ));
352 }
353 lines.join("\n")
354}
355
356fn format_providers(resp: &ProvidersResponse) -> String {
358 let mut lines = vec![format!(
359 "{:<12} {:<10} {:<20} {}",
360 "PROVIDER", "AVAILABLE", "BINARY", "CAPABILITIES"
361 )];
362 for p in &resp.providers {
363 let avail = if p.available { "yes" } else { "no" };
364 let mut caps = Vec::new();
365 let c = &p.capabilities;
366 if c.model {
367 caps.push("model");
368 }
369 if c.system_prompt {
370 caps.push("system-prompt");
371 }
372 if c.allowed_tools {
373 caps.push("allowed-tools");
374 }
375 if c.max_turns {
376 caps.push("max-turns");
377 }
378 if c.max_budget_usd {
379 caps.push("max-budget");
380 }
381 if c.output_format {
382 caps.push("output-format");
383 }
384 if c.worktree {
385 caps.push("worktree");
386 }
387 if c.unrestricted {
388 caps.push("unrestricted");
389 }
390 if c.resume {
391 caps.push("resume");
392 }
393 let caps_str = if caps.is_empty() {
394 "-".to_owned()
395 } else {
396 caps.join(", ")
397 };
398 lines.push(format!(
399 "{:<12} {:<10} {:<20} {}",
400 p.provider, avail, p.binary, caps_str
401 ));
402 }
403 lines.join("\n")
404}
405
406#[cfg_attr(coverage, allow(dead_code))]
408fn build_open_command(url: &str) -> std::process::Command {
409 #[cfg(target_os = "macos")]
410 {
411 let mut cmd = std::process::Command::new("open");
412 cmd.arg(url);
413 cmd
414 }
415 #[cfg(target_os = "linux")]
416 {
417 let mut cmd = std::process::Command::new("xdg-open");
418 cmd.arg(url);
419 cmd
420 }
421 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
422 {
423 let mut cmd = std::process::Command::new("xdg-open");
425 cmd.arg(url);
426 cmd
427 }
428}
429
430#[cfg(not(coverage))]
432fn open_browser(url: &str) -> Result<()> {
433 build_open_command(url).status()?;
434 Ok(())
435}
436
437#[cfg(coverage)]
439fn open_browser(_url: &str) -> Result<()> {
440 Ok(())
441}
442
443#[cfg_attr(coverage, allow(dead_code))]
446fn build_attach_command(backend_session_id: &str) -> std::process::Command {
447 let mut cmd = std::process::Command::new("tmux");
448 cmd.args(["attach-session", "-t", backend_session_id]);
449 cmd
450}
451
452#[cfg(not(any(test, coverage)))]
454fn attach_session(backend_session_id: &str) -> Result<()> {
455 let status = build_attach_command(backend_session_id).status()?;
456 if !status.success() {
457 anyhow::bail!("attach failed with {status}");
458 }
459 Ok(())
460}
461
462#[cfg(any(test, coverage))]
464#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
465fn attach_session(_backend_session_id: &str) -> Result<()> {
466 Ok(())
467}
468
469fn api_error(text: &str) -> anyhow::Error {
471 serde_json::from_str::<serde_json::Value>(text)
472 .ok()
473 .and_then(|v| v["error"].as_str().map(String::from))
474 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
475}
476
477async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
479 if resp.status().is_success() {
480 Ok(resp.text().await?)
481 } else {
482 let text = resp.text().await?;
483 Err(api_error(&text))
484 }
485}
486
487fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
489 if err.is_connect() {
490 anyhow::anyhow!(
491 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
492 )
493 } else {
494 anyhow::anyhow!("Network error connecting to {node}: {err}")
495 }
496}
497
498fn is_localhost(node: &str) -> bool {
500 let host = node.split(':').next().unwrap_or(node);
501 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
502}
503
504async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
506 let resp = client
507 .get(format!("{base}/api/v1/auth/token"))
508 .send()
509 .await
510 .ok()?;
511 let body: AuthTokenResponse = resp.json().await.ok()?;
512 if body.token.is_empty() {
513 None
514 } else {
515 Some(body.token)
516 }
517}
518
519async fn resolve_token(
521 client: &reqwest::Client,
522 base: &str,
523 node: &str,
524 explicit: Option<&str>,
525) -> Option<String> {
526 if let Some(t) = explicit {
527 return Some(t.to_owned());
528 }
529 if is_localhost(node) {
530 return discover_token(client, base).await;
531 }
532 None
533}
534
535fn authed_get(
537 client: &reqwest::Client,
538 url: String,
539 token: Option<&str>,
540) -> reqwest::RequestBuilder {
541 let req = client.get(url);
542 if let Some(t) = token {
543 req.bearer_auth(t)
544 } else {
545 req
546 }
547}
548
549fn authed_post(
551 client: &reqwest::Client,
552 url: String,
553 token: Option<&str>,
554) -> reqwest::RequestBuilder {
555 let req = client.post(url);
556 if let Some(t) = token {
557 req.bearer_auth(t)
558 } else {
559 req
560 }
561}
562
563fn authed_delete(
565 client: &reqwest::Client,
566 url: String,
567 token: Option<&str>,
568) -> reqwest::RequestBuilder {
569 let req = client.delete(url);
570 if let Some(t) = token {
571 req.bearer_auth(t)
572 } else {
573 req
574 }
575}
576
577async fn fetch_output(
579 client: &reqwest::Client,
580 base: &str,
581 name: &str,
582 lines: usize,
583 token: Option<&str>,
584) -> Result<String> {
585 let resp = authed_get(
586 client,
587 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
588 token,
589 )
590 .send()
591 .await?;
592 let text = ok_or_api_error(resp).await?;
593 let output: OutputResponse = serde_json::from_str(&text)?;
594 Ok(output.output)
595}
596
597async fn fetch_session_status(
599 client: &reqwest::Client,
600 base: &str,
601 name: &str,
602 token: Option<&str>,
603) -> Result<String> {
604 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
605 .send()
606 .await?;
607 let text = ok_or_api_error(resp).await?;
608 let session: Session = serde_json::from_str(&text)?;
609 Ok(session.status.to_string())
610}
611
612fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
619 if prev.is_empty() {
620 return new;
621 }
622
623 let prev_lines: Vec<&str> = prev.lines().collect();
624 let new_lines: Vec<&str> = new.lines().collect();
625
626 if new_lines.is_empty() {
627 return "";
628 }
629
630 let last_prev = prev_lines[prev_lines.len() - 1];
632
633 for i in (0..new_lines.len()).rev() {
635 if new_lines[i] == last_prev {
636 let overlap_len = prev_lines.len().min(i + 1);
638 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
639 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
640 if prev_tail == new_overlap {
641 if i + 1 < new_lines.len() {
642 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
644 return new.get(consumed.min(new.len())..).unwrap_or("");
645 }
646 return "";
647 }
648 }
649 }
650
651 new
653}
654
655async fn follow_logs(
657 client: &reqwest::Client,
658 base: &str,
659 name: &str,
660 lines: usize,
661 token: Option<&str>,
662 writer: &mut (dyn std::io::Write + Send),
663) -> Result<()> {
664 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
665 write!(writer, "{prev_output}")?;
666
667 loop {
668 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
669
670 let status = fetch_session_status(client, base, name, token).await?;
672 let is_terminal = status == "finished" || status == "killed" || status == "lost";
673
674 let new_output = fetch_output(client, base, name, lines, token).await?;
676
677 let diff = diff_output(&prev_output, &new_output);
678 if !diff.is_empty() {
679 write!(writer, "{diff}")?;
680 }
681 prev_output = new_output;
682
683 if is_terminal {
684 break;
685 }
686 }
687 Ok(())
688}
689
690#[cfg_attr(coverage, allow(dead_code))]
693const CRONTAB_TAG: &str = "#pulpo:";
694
695#[cfg(not(coverage))]
697fn read_crontab() -> Result<String> {
698 let output = std::process::Command::new("crontab").arg("-l").output()?;
699 if output.status.success() {
700 Ok(String::from_utf8_lossy(&output.stdout).to_string())
701 } else {
702 Ok(String::new())
703 }
704}
705
706#[cfg(not(coverage))]
708fn write_crontab(content: &str) -> Result<()> {
709 use std::io::Write;
710 let mut child = std::process::Command::new("crontab")
711 .arg("-")
712 .stdin(std::process::Stdio::piped())
713 .spawn()?;
714 child
715 .stdin
716 .as_mut()
717 .unwrap()
718 .write_all(content.as_bytes())?;
719 let status = child.wait()?;
720 if !status.success() {
721 anyhow::bail!("crontab write failed");
722 }
723 Ok(())
724}
725
726#[cfg_attr(coverage, allow(dead_code))]
728fn build_crontab_line(
729 name: &str,
730 cron: &str,
731 workdir: &str,
732 provider: &str,
733 prompt: &str,
734 node: &str,
735) -> String {
736 format!(
737 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
738 )
739}
740
741#[cfg_attr(coverage, allow(dead_code))]
743fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
744 let tag = format!("{CRONTAB_TAG}{name}");
745 if crontab.contains(&tag) {
746 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
747 }
748 let mut result = crontab.to_owned();
749 result.push_str(line);
750 Ok(result)
751}
752
753#[cfg_attr(coverage, allow(dead_code))]
755fn crontab_list(crontab: &str) -> String {
756 let entries: Vec<&str> = crontab
757 .lines()
758 .filter(|l| l.contains(CRONTAB_TAG))
759 .collect();
760 if entries.is_empty() {
761 return "No pulpo schedules.".into();
762 }
763 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
764 for entry in entries {
765 let paused = entry.starts_with('#');
766 let raw = entry.trim_start_matches('#').trim();
767 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
768 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
769 let cron_expr = if parts.len() >= 5 {
770 parts[..5].join(" ")
771 } else {
772 "?".into()
773 };
774 lines.push(format!(
775 "{:<20} {:<15} {}",
776 name,
777 cron_expr,
778 if paused { "yes" } else { "no" }
779 ));
780 }
781 lines.join("\n")
782}
783
784#[cfg_attr(coverage, allow(dead_code))]
786fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
787 use std::fmt::Write;
788 let tag = format!("{CRONTAB_TAG}{name}");
789 let filtered =
790 crontab
791 .lines()
792 .filter(|l| !l.contains(&tag))
793 .fold(String::new(), |mut acc, l| {
794 writeln!(acc, "{l}").unwrap();
795 acc
796 });
797 if filtered.len() == crontab.len() {
798 anyhow::bail!("schedule \"{name}\" not found");
799 }
800 Ok(filtered)
801}
802
803#[cfg_attr(coverage, allow(dead_code))]
805fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
806 use std::fmt::Write;
807 let tag = format!("{CRONTAB_TAG}{name}");
808 let mut found = false;
809 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
810 if l.contains(&tag) && !l.starts_with('#') {
811 found = true;
812 writeln!(acc, "#{l}").unwrap();
813 } else {
814 writeln!(acc, "{l}").unwrap();
815 }
816 acc
817 });
818 if !found {
819 anyhow::bail!("schedule \"{name}\" not found or already paused");
820 }
821 Ok(updated)
822}
823
824#[cfg_attr(coverage, allow(dead_code))]
826fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
827 use std::fmt::Write;
828 let tag = format!("{CRONTAB_TAG}{name}");
829 let mut found = false;
830 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
831 if l.contains(&tag) && l.starts_with('#') {
832 found = true;
833 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
834 } else {
835 writeln!(acc, "{l}").unwrap();
836 }
837 acc
838 });
839 if !found {
840 anyhow::bail!("schedule \"{name}\" not found or not paused");
841 }
842 Ok(updated)
843}
844
845#[cfg(not(coverage))]
847fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
848 match action {
849 ScheduleAction::Install {
850 name,
851 cron,
852 workdir,
853 provider,
854 prompt,
855 } => {
856 let crontab = read_crontab()?;
857 let joined_prompt = prompt.join(" ");
858 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
859 let updated = crontab_install(&crontab, name, &line)?;
860 write_crontab(&updated)?;
861 Ok(format!("Installed schedule \"{name}\""))
862 }
863 ScheduleAction::List => {
864 let crontab = read_crontab()?;
865 Ok(crontab_list(&crontab))
866 }
867 ScheduleAction::Remove { name } => {
868 let crontab = read_crontab()?;
869 let updated = crontab_remove(&crontab, name)?;
870 write_crontab(&updated)?;
871 Ok(format!("Removed schedule \"{name}\""))
872 }
873 ScheduleAction::Pause { name } => {
874 let crontab = read_crontab()?;
875 let updated = crontab_pause(&crontab, name)?;
876 write_crontab(&updated)?;
877 Ok(format!("Paused schedule \"{name}\""))
878 }
879 ScheduleAction::Resume { name } => {
880 let crontab = read_crontab()?;
881 let updated = crontab_resume(&crontab, name)?;
882 write_crontab(&updated)?;
883 Ok(format!("Resumed schedule \"{name}\""))
884 }
885 }
886}
887
888#[cfg(coverage)]
890fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
891 Ok(String::new())
892}
893
894#[allow(clippy::too_many_lines)]
896pub async fn execute(cli: &Cli) -> Result<String> {
897 let url = base_url(&cli.node);
898 let client = reqwest::Client::new();
899 let node = &cli.node;
900 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
901
902 match &cli.command {
903 Commands::Attach { name } => {
904 let resp = authed_get(
906 &client,
907 format!("{url}/api/v1/sessions/{name}"),
908 token.as_deref(),
909 )
910 .send()
911 .await
912 .map_err(|e| friendly_error(&e, node))?;
913 let text = ok_or_api_error(resp).await?;
914 let session: Session = serde_json::from_str(&text)?;
915 match session.status.to_string().as_str() {
916 "lost" => {
917 anyhow::bail!(
918 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
919 );
920 }
921 "finished" | "killed" => {
922 anyhow::bail!(
923 "Session \"{name}\" is {} — cannot attach to a finished session.",
924 session.status
925 );
926 }
927 _ => {}
928 }
929 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
930 attach_session(&backend_id)?;
931 Ok(format!("Detached from session {name}."))
932 }
933 Commands::Input { name, text } => {
934 let input_text = text.as_deref().unwrap_or("\n");
935 let body = serde_json::json!({ "text": input_text });
936 let resp = authed_post(
937 &client,
938 format!("{url}/api/v1/sessions/{name}/input"),
939 token.as_deref(),
940 )
941 .json(&body)
942 .send()
943 .await
944 .map_err(|e| friendly_error(&e, node))?;
945 ok_or_api_error(resp).await?;
946 Ok(format!("Sent input to session {name}."))
947 }
948 Commands::List => {
949 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
950 .send()
951 .await
952 .map_err(|e| friendly_error(&e, node))?;
953 let text = ok_or_api_error(resp).await?;
954 let sessions: Vec<Session> = serde_json::from_str(&text)?;
955 Ok(format_sessions(&sessions))
956 }
957 Commands::Providers => {
958 let resp = authed_get(&client, format!("{url}/api/v1/providers"), token.as_deref())
959 .send()
960 .await
961 .map_err(|e| friendly_error(&e, node))?;
962 let text = ok_or_api_error(resp).await?;
963 let resp: ProvidersResponse = serde_json::from_str(&text)?;
964 Ok(format_providers(&resp))
965 }
966 Commands::Nodes => {
967 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
968 .send()
969 .await
970 .map_err(|e| friendly_error(&e, node))?;
971 let text = ok_or_api_error(resp).await?;
972 let resp: PeersResponse = serde_json::from_str(&text)?;
973 Ok(format_nodes(&resp))
974 }
975 Commands::Spawn {
976 workdir,
977 name,
978 provider,
979 auto,
980 unrestricted,
981 model,
982 system_prompt,
983 allowed_tools,
984 ink,
985 max_turns,
986 max_budget,
987 output_format,
988 worktree,
989 conversation_id,
990 prompt,
991 } => {
992 let prompt_text = prompt.join(" ");
993 let mode = if *auto { "autonomous" } else { "interactive" };
994 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
996 std::env::current_dir()
997 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
998 });
999 let mut body = serde_json::json!({
1000 "workdir": resolved_workdir,
1001 "mode": mode,
1002 });
1003 if !prompt_text.is_empty() {
1005 body["prompt"] = serde_json::json!(prompt_text);
1006 }
1007 if let Some(p) = provider {
1009 body["provider"] = serde_json::json!(p);
1010 }
1011 if *unrestricted {
1012 body["unrestricted"] = serde_json::json!(true);
1013 }
1014 if let Some(n) = name {
1015 body["name"] = serde_json::json!(n);
1016 }
1017 if let Some(m) = model {
1018 body["model"] = serde_json::json!(m);
1019 }
1020 if let Some(sp) = system_prompt {
1021 body["system_prompt"] = serde_json::json!(sp);
1022 }
1023 if let Some(tools) = allowed_tools {
1024 body["allowed_tools"] = serde_json::json!(tools);
1025 }
1026 if let Some(p) = ink {
1027 body["ink"] = serde_json::json!(p);
1028 }
1029 if let Some(mt) = max_turns {
1030 body["max_turns"] = serde_json::json!(mt);
1031 }
1032 if let Some(mb) = max_budget {
1033 body["max_budget_usd"] = serde_json::json!(mb);
1034 }
1035 if let Some(of) = output_format {
1036 body["output_format"] = serde_json::json!(of);
1037 }
1038 if *worktree {
1039 body["worktree"] = serde_json::json!(true);
1040 }
1041 if let Some(cid) = conversation_id {
1042 body["conversation_id"] = serde_json::json!(cid);
1043 }
1044 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1045 .json(&body)
1046 .send()
1047 .await
1048 .map_err(|e| friendly_error(&e, node))?;
1049 let text = ok_or_api_error(resp).await?;
1050 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1051 let mut msg = format!(
1052 "Created session \"{}\" ({})",
1053 resp.session.name, resp.session.id
1054 );
1055 for w in &resp.warnings {
1056 use std::fmt::Write;
1057 let _ = write!(msg, "\n Warning: {w}");
1058 }
1059 Ok(msg)
1060 }
1061 Commands::Kill { name } => {
1062 let resp = authed_post(
1063 &client,
1064 format!("{url}/api/v1/sessions/{name}/kill"),
1065 token.as_deref(),
1066 )
1067 .send()
1068 .await
1069 .map_err(|e| friendly_error(&e, node))?;
1070 ok_or_api_error(resp).await?;
1071 Ok(format!("Session {name} killed."))
1072 }
1073 Commands::Delete { name } => {
1074 let resp = authed_delete(
1075 &client,
1076 format!("{url}/api/v1/sessions/{name}"),
1077 token.as_deref(),
1078 )
1079 .send()
1080 .await
1081 .map_err(|e| friendly_error(&e, node))?;
1082 ok_or_api_error(resp).await?;
1083 Ok(format!("Session {name} deleted."))
1084 }
1085 Commands::Logs {
1086 name,
1087 lines,
1088 follow,
1089 } => {
1090 if *follow {
1091 let mut stdout = std::io::stdout();
1092 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1093 .await
1094 .map_err(|e| {
1095 match e.downcast::<reqwest::Error>() {
1097 Ok(re) => friendly_error(&re, node),
1098 Err(other) => other,
1099 }
1100 })?;
1101 Ok(String::new())
1102 } else {
1103 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1104 .await
1105 .map_err(|e| match e.downcast::<reqwest::Error>() {
1106 Ok(re) => friendly_error(&re, node),
1107 Err(other) => other,
1108 })?;
1109 Ok(output)
1110 }
1111 }
1112 Commands::Interventions { name } => {
1113 let resp = authed_get(
1114 &client,
1115 format!("{url}/api/v1/sessions/{name}/interventions"),
1116 token.as_deref(),
1117 )
1118 .send()
1119 .await
1120 .map_err(|e| friendly_error(&e, node))?;
1121 let text = ok_or_api_error(resp).await?;
1122 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1123 Ok(format_interventions(&events))
1124 }
1125 Commands::Ui => {
1126 let dashboard = base_url(&cli.node);
1127 open_browser(&dashboard)?;
1128 Ok(format!("Opening {dashboard}"))
1129 }
1130 Commands::Resume { name } => {
1131 let resp = authed_post(
1132 &client,
1133 format!("{url}/api/v1/sessions/{name}/resume"),
1134 token.as_deref(),
1135 )
1136 .send()
1137 .await
1138 .map_err(|e| friendly_error(&e, node))?;
1139 let text = ok_or_api_error(resp).await?;
1140 let session: Session = serde_json::from_str(&text)?;
1141 Ok(format!("Resumed session \"{}\"", session.name))
1142 }
1143 Commands::Culture {
1144 session,
1145 kind,
1146 repo,
1147 ink,
1148 limit,
1149 context,
1150 get,
1151 delete,
1152 push,
1153 } => {
1154 if let Some(id) = get {
1156 let endpoint = format!("{url}/api/v1/culture/{id}");
1157 let resp = authed_get(&client, endpoint, token.as_deref())
1158 .send()
1159 .await
1160 .map_err(|e| friendly_error(&e, node))?;
1161 let text = ok_or_api_error(resp).await?;
1162 let resp: CultureItemResponse = serde_json::from_str(&text)?;
1163 return Ok(format_culture(&[resp.culture]));
1164 }
1165
1166 if let Some(id) = delete {
1168 let endpoint = format!("{url}/api/v1/culture/{id}");
1169 let resp = authed_delete(&client, endpoint, token.as_deref())
1170 .send()
1171 .await
1172 .map_err(|e| friendly_error(&e, node))?;
1173 let text = ok_or_api_error(resp).await?;
1174 let resp: CultureDeleteResponse = serde_json::from_str(&text)?;
1175 return Ok(if resp.deleted {
1176 format!("Deleted culture item {id}")
1177 } else {
1178 format!("Culture item {id} not found")
1179 });
1180 }
1181
1182 if *push {
1184 let endpoint = format!("{url}/api/v1/culture/push");
1185 let resp = authed_post(&client, endpoint, token.as_deref())
1186 .send()
1187 .await
1188 .map_err(|e| friendly_error(&e, node))?;
1189 let text = ok_or_api_error(resp).await?;
1190 let resp: CulturePushResponse = serde_json::from_str(&text)?;
1191 return Ok(resp.message);
1192 }
1193
1194 let mut params = vec![format!("limit={limit}")];
1196 let endpoint = if *context {
1197 if let Some(r) = repo {
1198 params.push(format!("workdir={r}"));
1199 }
1200 if let Some(i) = ink {
1201 params.push(format!("ink={i}"));
1202 }
1203 format!("{url}/api/v1/culture/context?{}", params.join("&"))
1204 } else {
1205 if let Some(s) = session {
1206 params.push(format!("session_id={s}"));
1207 }
1208 if let Some(k) = kind {
1209 params.push(format!("kind={k}"));
1210 }
1211 if let Some(r) = repo {
1212 params.push(format!("repo={r}"));
1213 }
1214 if let Some(i) = ink {
1215 params.push(format!("ink={i}"));
1216 }
1217 format!("{url}/api/v1/culture?{}", params.join("&"))
1218 };
1219 let resp = authed_get(&client, endpoint, token.as_deref())
1220 .send()
1221 .await
1222 .map_err(|e| friendly_error(&e, node))?;
1223 let text = ok_or_api_error(resp).await?;
1224 let resp: CultureResponse = serde_json::from_str(&text)?;
1225 Ok(format_culture(&resp.culture))
1226 }
1227 Commands::Schedule { action } => execute_schedule(action, node),
1228 }
1229}
1230
1231#[cfg(test)]
1232mod tests {
1233 use super::*;
1234 use pulpo_common::session::Provider;
1235
1236 #[test]
1237 fn test_base_url() {
1238 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1239 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1240 }
1241
1242 #[test]
1243 fn test_cli_parse_list() {
1244 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1245 assert_eq!(cli.node, "localhost:7433");
1246 assert!(matches!(cli.command, Commands::List));
1247 }
1248
1249 #[test]
1250 fn test_cli_parse_nodes() {
1251 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1252 assert!(matches!(cli.command, Commands::Nodes));
1253 }
1254
1255 #[test]
1256 fn test_cli_parse_providers() {
1257 let cli = Cli::try_parse_from(["pulpo", "providers"]).unwrap();
1258 assert!(matches!(cli.command, Commands::Providers));
1259 }
1260
1261 #[test]
1262 fn test_cli_parse_providers_alias() {
1263 let cli = Cli::try_parse_from(["pulpo", "p"]).unwrap();
1264 assert!(matches!(cli.command, Commands::Providers));
1265 }
1266
1267 #[test]
1268 fn test_format_providers_all() {
1269 let resp = ProvidersResponse {
1270 providers: vec![
1271 ProviderInfoResponse {
1272 provider: Provider::Claude,
1273 binary: "claude".into(),
1274 available: true,
1275 capabilities: ProviderCapabilitiesResponse {
1276 model: true,
1277 system_prompt: true,
1278 allowed_tools: true,
1279 max_turns: true,
1280 max_budget_usd: true,
1281 output_format: true,
1282 worktree: true,
1283 unrestricted: true,
1284 resume: true,
1285 },
1286 },
1287 ProviderInfoResponse {
1288 provider: Provider::Shell,
1289 binary: "bash".into(),
1290 available: true,
1291 capabilities: ProviderCapabilitiesResponse {
1292 model: false,
1293 system_prompt: false,
1294 allowed_tools: false,
1295 max_turns: false,
1296 max_budget_usd: false,
1297 output_format: false,
1298 worktree: false,
1299 unrestricted: false,
1300 resume: false,
1301 },
1302 },
1303 ],
1304 };
1305 let output = format_providers(&resp);
1306 assert!(output.contains("PROVIDER"));
1307 assert!(output.contains("claude"));
1308 assert!(output.contains("yes"));
1309 assert!(output.contains("shell"));
1310 assert!(output.contains("bash"));
1311 assert!(output.contains('-'));
1313 assert!(output.contains("model"));
1315 assert!(output.contains("system-prompt"));
1316 assert!(output.contains("worktree"));
1317 }
1318
1319 #[test]
1320 fn test_format_providers_unavailable() {
1321 let resp = ProvidersResponse {
1322 providers: vec![ProviderInfoResponse {
1323 provider: Provider::Codex,
1324 binary: "codex".into(),
1325 available: false,
1326 capabilities: ProviderCapabilitiesResponse {
1327 model: true,
1328 system_prompt: false,
1329 allowed_tools: false,
1330 max_turns: false,
1331 max_budget_usd: false,
1332 output_format: false,
1333 worktree: false,
1334 unrestricted: false,
1335 resume: true,
1336 },
1337 }],
1338 };
1339 let output = format_providers(&resp);
1340 assert!(output.contains("no"));
1341 assert!(output.contains("codex"));
1342 }
1343
1344 #[test]
1345 fn test_cli_parse_ui() {
1346 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1347 assert!(matches!(cli.command, Commands::Ui));
1348 }
1349
1350 #[test]
1351 fn test_cli_parse_ui_custom_node() {
1352 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1353 assert!(matches!(cli.command, Commands::Ui));
1354 assert_eq!(cli.node, "mac-mini:7433");
1355 }
1356
1357 #[test]
1358 fn test_build_open_command() {
1359 let cmd = build_open_command("http://localhost:7433");
1360 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1361 assert_eq!(args, vec!["http://localhost:7433"]);
1362 #[cfg(target_os = "macos")]
1363 assert_eq!(cmd.get_program(), "open");
1364 #[cfg(target_os = "linux")]
1365 assert_eq!(cmd.get_program(), "xdg-open");
1366 }
1367
1368 #[test]
1369 fn test_cli_parse_spawn() {
1370 let cli = Cli::try_parse_from([
1371 "pulpo",
1372 "spawn",
1373 "--workdir",
1374 "/tmp/repo",
1375 "Fix",
1376 "the",
1377 "bug",
1378 ])
1379 .unwrap();
1380 assert!(matches!(
1381 &cli.command,
1382 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1383 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1384 && !unrestricted && prompt == &["Fix", "the", "bug"]
1385 ));
1386 }
1387
1388 #[test]
1389 fn test_cli_parse_spawn_with_provider() {
1390 let cli = Cli::try_parse_from([
1391 "pulpo",
1392 "spawn",
1393 "--workdir",
1394 "/tmp",
1395 "--provider",
1396 "codex",
1397 "Do it",
1398 ])
1399 .unwrap();
1400 assert!(matches!(
1401 &cli.command,
1402 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1403 ));
1404 }
1405
1406 #[test]
1407 fn test_cli_parse_spawn_auto() {
1408 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1409 .unwrap();
1410 assert!(matches!(
1411 &cli.command,
1412 Commands::Spawn { auto, .. } if *auto
1413 ));
1414 }
1415
1416 #[test]
1417 fn test_cli_parse_spawn_unrestricted() {
1418 let cli = Cli::try_parse_from([
1419 "pulpo",
1420 "spawn",
1421 "--workdir",
1422 "/tmp",
1423 "--unrestricted",
1424 "Do it",
1425 ])
1426 .unwrap();
1427 assert!(matches!(
1428 &cli.command,
1429 Commands::Spawn { unrestricted, .. } if *unrestricted
1430 ));
1431 }
1432
1433 #[test]
1434 fn test_cli_parse_spawn_unrestricted_default() {
1435 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1436 assert!(matches!(
1437 &cli.command,
1438 Commands::Spawn { unrestricted, .. } if !unrestricted
1439 ));
1440 }
1441
1442 #[test]
1443 fn test_cli_parse_spawn_with_name() {
1444 let cli = Cli::try_parse_from([
1445 "pulpo",
1446 "spawn",
1447 "--workdir",
1448 "/tmp/repo",
1449 "--name",
1450 "my-task",
1451 "Fix it",
1452 ])
1453 .unwrap();
1454 assert!(matches!(
1455 &cli.command,
1456 Commands::Spawn { workdir, name, .. }
1457 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1458 ));
1459 }
1460
1461 #[test]
1462 fn test_cli_parse_spawn_without_name() {
1463 let cli =
1464 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1465 assert!(matches!(
1466 &cli.command,
1467 Commands::Spawn { name, .. } if name.is_none()
1468 ));
1469 }
1470
1471 #[test]
1472 fn test_cli_parse_spawn_with_conversation_id() {
1473 let cli = Cli::try_parse_from([
1474 "pulpo",
1475 "spawn",
1476 "--workdir",
1477 "/tmp/repo",
1478 "--conversation-id",
1479 "conv-abc-123",
1480 "Fix it",
1481 ])
1482 .unwrap();
1483 assert!(matches!(
1484 &cli.command,
1485 Commands::Spawn { conversation_id, .. }
1486 if conversation_id.as_deref() == Some("conv-abc-123")
1487 ));
1488 }
1489
1490 #[test]
1491 fn test_cli_parse_spawn_without_conversation_id() {
1492 let cli =
1493 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1494 assert!(matches!(
1495 &cli.command,
1496 Commands::Spawn { conversation_id, .. } if conversation_id.is_none()
1497 ));
1498 }
1499
1500 #[test]
1501 fn test_cli_parse_logs() {
1502 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1503 assert!(matches!(
1504 &cli.command,
1505 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1506 ));
1507 }
1508
1509 #[test]
1510 fn test_cli_parse_logs_with_lines() {
1511 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1512 assert!(matches!(
1513 &cli.command,
1514 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1515 ));
1516 }
1517
1518 #[test]
1519 fn test_cli_parse_logs_follow() {
1520 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1521 assert!(matches!(
1522 &cli.command,
1523 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1524 ));
1525 }
1526
1527 #[test]
1528 fn test_cli_parse_logs_follow_short() {
1529 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1530 assert!(matches!(
1531 &cli.command,
1532 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1533 ));
1534 }
1535
1536 #[test]
1537 fn test_cli_parse_kill() {
1538 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1539 assert!(matches!(
1540 &cli.command,
1541 Commands::Kill { name } if name == "my-session"
1542 ));
1543 }
1544
1545 #[test]
1546 fn test_cli_parse_delete() {
1547 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1548 assert!(matches!(
1549 &cli.command,
1550 Commands::Delete { name } if name == "my-session"
1551 ));
1552 }
1553
1554 #[test]
1555 fn test_cli_parse_resume() {
1556 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1557 assert!(matches!(
1558 &cli.command,
1559 Commands::Resume { name } if name == "my-session"
1560 ));
1561 }
1562
1563 #[test]
1564 fn test_cli_parse_input() {
1565 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1566 assert!(matches!(
1567 &cli.command,
1568 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1569 ));
1570 }
1571
1572 #[test]
1573 fn test_cli_parse_input_no_text() {
1574 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1575 assert!(matches!(
1576 &cli.command,
1577 Commands::Input { name, text } if name == "my-session" && text.is_none()
1578 ));
1579 }
1580
1581 #[test]
1582 fn test_cli_parse_input_alias() {
1583 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1584 assert!(matches!(
1585 &cli.command,
1586 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1587 ));
1588 }
1589
1590 #[test]
1591 fn test_cli_parse_custom_node() {
1592 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1593 assert_eq!(cli.node, "win-pc:8080");
1594 }
1595
1596 #[test]
1597 fn test_cli_version() {
1598 let result = Cli::try_parse_from(["pulpo", "--version"]);
1599 let err = result.unwrap_err();
1601 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1602 }
1603
1604 #[test]
1605 fn test_cli_parse_no_subcommand_fails() {
1606 let result = Cli::try_parse_from(["pulpo"]);
1607 assert!(result.is_err());
1608 }
1609
1610 #[test]
1611 fn test_cli_debug() {
1612 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1613 let debug = format!("{cli:?}");
1614 assert!(debug.contains("List"));
1615 }
1616
1617 #[test]
1618 fn test_commands_debug() {
1619 let cmd = Commands::List;
1620 assert_eq!(format!("{cmd:?}"), "List");
1621 }
1622
1623 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"active","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1625
1626 fn test_create_response_json() -> String {
1628 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1629 }
1630
1631 async fn start_test_server() -> String {
1633 use axum::http::StatusCode;
1634 use axum::{
1635 Json, Router,
1636 routing::{get, post},
1637 };
1638
1639 let create_json = test_create_response_json();
1640
1641 let app = Router::new()
1642 .route(
1643 "/api/v1/sessions",
1644 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1645 (StatusCode::CREATED, create_json.clone())
1646 }),
1647 )
1648 .route(
1649 "/api/v1/sessions/{id}",
1650 get(|| async { TEST_SESSION_JSON.to_owned() })
1651 .delete(|| async { StatusCode::NO_CONTENT }),
1652 )
1653 .route(
1654 "/api/v1/sessions/{id}/kill",
1655 post(|| async { StatusCode::NO_CONTENT }),
1656 )
1657 .route(
1658 "/api/v1/sessions/{id}/output",
1659 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1660 )
1661 .route(
1662 "/api/v1/peers",
1663 get(|| async {
1664 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1665 }),
1666 )
1667 .route(
1668 "/api/v1/sessions/{id}/resume",
1669 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1670 )
1671 .route(
1672 "/api/v1/sessions/{id}/interventions",
1673 get(|| async { "[]".to_owned() }),
1674 )
1675 .route(
1676 "/api/v1/sessions/{id}/input",
1677 post(|| async { StatusCode::NO_CONTENT }),
1678 );
1679
1680 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1681 let addr = listener.local_addr().unwrap();
1682 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1683 format!("127.0.0.1:{}", addr.port())
1684 }
1685
1686 #[tokio::test]
1687 async fn test_execute_list_success() {
1688 let node = start_test_server().await;
1689 let cli = Cli {
1690 node,
1691 token: None,
1692 command: Commands::List,
1693 };
1694 let result = execute(&cli).await.unwrap();
1695 assert_eq!(result, "No sessions.");
1696 }
1697
1698 #[tokio::test]
1699 async fn test_execute_nodes_success() {
1700 let node = start_test_server().await;
1701 let cli = Cli {
1702 node,
1703 token: None,
1704 command: Commands::Nodes,
1705 };
1706 let result = execute(&cli).await.unwrap();
1707 assert!(result.contains("test"));
1708 assert!(result.contains("(local)"));
1709 assert!(result.contains("NAME"));
1710 }
1711
1712 #[tokio::test]
1713 async fn test_execute_spawn_success() {
1714 let node = start_test_server().await;
1715 let cli = Cli {
1716 node,
1717 token: None,
1718 command: Commands::Spawn {
1719 workdir: Some("/tmp/repo".into()),
1720 name: None,
1721 provider: Some("claude".into()),
1722 auto: false,
1723 unrestricted: false,
1724 model: None,
1725 system_prompt: None,
1726 allowed_tools: None,
1727 ink: None,
1728 max_turns: None,
1729 max_budget: None,
1730 output_format: None,
1731 worktree: false,
1732 conversation_id: None,
1733 prompt: vec!["Fix".into(), "bug".into()],
1734 },
1735 };
1736 let result = execute(&cli).await.unwrap();
1737 assert!(result.contains("Created session"));
1738 assert!(result.contains("repo"));
1739 }
1740
1741 #[tokio::test]
1742 async fn test_execute_spawn_with_all_new_flags() {
1743 let node = start_test_server().await;
1744 let cli = Cli {
1745 node,
1746 token: None,
1747 command: Commands::Spawn {
1748 workdir: Some("/tmp/repo".into()),
1749 name: None,
1750 provider: Some("claude".into()),
1751 auto: false,
1752 unrestricted: false,
1753 model: Some("opus".into()),
1754 system_prompt: Some("Be helpful".into()),
1755 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1756 ink: Some("coder".into()),
1757 max_turns: Some(5),
1758 max_budget: Some(2.5),
1759 output_format: Some("json".into()),
1760 worktree: false,
1761 conversation_id: None,
1762 prompt: vec!["Fix".into(), "bug".into()],
1763 },
1764 };
1765 let result = execute(&cli).await.unwrap();
1766 assert!(result.contains("Created session"));
1767 }
1768
1769 #[tokio::test]
1770 async fn test_execute_spawn_auto_mode() {
1771 let node = start_test_server().await;
1772 let cli = Cli {
1773 node,
1774 token: None,
1775 command: Commands::Spawn {
1776 workdir: Some("/tmp/repo".into()),
1777 name: None,
1778 provider: Some("claude".into()),
1779 auto: true,
1780 unrestricted: false,
1781 model: None,
1782 system_prompt: None,
1783 allowed_tools: None,
1784 ink: None,
1785 max_turns: None,
1786 max_budget: None,
1787 output_format: None,
1788 worktree: false,
1789 conversation_id: None,
1790 prompt: vec!["Do it".into()],
1791 },
1792 };
1793 let result = execute(&cli).await.unwrap();
1794 assert!(result.contains("Created session"));
1795 }
1796
1797 #[tokio::test]
1798 async fn test_execute_spawn_with_name() {
1799 let node = start_test_server().await;
1800 let cli = Cli {
1801 node,
1802 token: None,
1803 command: Commands::Spawn {
1804 workdir: Some("/tmp/repo".into()),
1805 name: Some("my-task".into()),
1806 provider: Some("claude".into()),
1807 auto: false,
1808 unrestricted: false,
1809 model: None,
1810 system_prompt: None,
1811 allowed_tools: None,
1812 ink: None,
1813 max_turns: None,
1814 max_budget: None,
1815 output_format: None,
1816 worktree: false,
1817 conversation_id: None,
1818 prompt: vec!["Fix".into(), "bug".into()],
1819 },
1820 };
1821 let result = execute(&cli).await.unwrap();
1822 assert!(result.contains("Created session"));
1823 }
1824
1825 #[tokio::test]
1826 async fn test_execute_kill_success() {
1827 let node = start_test_server().await;
1828 let cli = Cli {
1829 node,
1830 token: None,
1831 command: Commands::Kill {
1832 name: "test-session".into(),
1833 },
1834 };
1835 let result = execute(&cli).await.unwrap();
1836 assert!(result.contains("killed"));
1837 }
1838
1839 #[tokio::test]
1840 async fn test_execute_delete_success() {
1841 let node = start_test_server().await;
1842 let cli = Cli {
1843 node,
1844 token: None,
1845 command: Commands::Delete {
1846 name: "test-session".into(),
1847 },
1848 };
1849 let result = execute(&cli).await.unwrap();
1850 assert!(result.contains("deleted"));
1851 }
1852
1853 #[tokio::test]
1854 async fn test_execute_logs_success() {
1855 let node = start_test_server().await;
1856 let cli = Cli {
1857 node,
1858 token: None,
1859 command: Commands::Logs {
1860 name: "test-session".into(),
1861 lines: 50,
1862 follow: false,
1863 },
1864 };
1865 let result = execute(&cli).await.unwrap();
1866 assert!(result.contains("test output"));
1867 }
1868
1869 #[tokio::test]
1870 async fn test_execute_list_connection_refused() {
1871 let cli = Cli {
1872 node: "localhost:1".into(),
1873 token: None,
1874 command: Commands::List,
1875 };
1876 let result = execute(&cli).await;
1877 let err = result.unwrap_err().to_string();
1878 assert!(
1879 err.contains("Could not connect to pulpod"),
1880 "Expected friendly error, got: {err}"
1881 );
1882 assert!(err.contains("localhost:1"));
1883 }
1884
1885 #[tokio::test]
1886 async fn test_execute_nodes_connection_refused() {
1887 let cli = Cli {
1888 node: "localhost:1".into(),
1889 token: None,
1890 command: Commands::Nodes,
1891 };
1892 let result = execute(&cli).await;
1893 let err = result.unwrap_err().to_string();
1894 assert!(err.contains("Could not connect to pulpod"));
1895 }
1896
1897 #[tokio::test]
1898 async fn test_execute_kill_error_response() {
1899 use axum::{Router, http::StatusCode, routing::post};
1900
1901 let app = Router::new().route(
1902 "/api/v1/sessions/{id}/kill",
1903 post(|| async {
1904 (
1905 StatusCode::NOT_FOUND,
1906 "{\"error\":\"session not found: test-session\"}",
1907 )
1908 }),
1909 );
1910 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1911 let addr = listener.local_addr().unwrap();
1912 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1913 let node = format!("127.0.0.1:{}", addr.port());
1914
1915 let cli = Cli {
1916 node,
1917 token: None,
1918 command: Commands::Kill {
1919 name: "test-session".into(),
1920 },
1921 };
1922 let err = execute(&cli).await.unwrap_err();
1923 assert_eq!(err.to_string(), "session not found: test-session");
1924 }
1925
1926 #[tokio::test]
1927 async fn test_execute_delete_error_response() {
1928 use axum::{Router, http::StatusCode, routing::delete};
1929
1930 let app = Router::new().route(
1931 "/api/v1/sessions/{id}",
1932 delete(|| async {
1933 (
1934 StatusCode::CONFLICT,
1935 "{\"error\":\"cannot delete session in 'running' state\"}",
1936 )
1937 }),
1938 );
1939 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1940 let addr = listener.local_addr().unwrap();
1941 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1942 let node = format!("127.0.0.1:{}", addr.port());
1943
1944 let cli = Cli {
1945 node,
1946 token: None,
1947 command: Commands::Delete {
1948 name: "test-session".into(),
1949 },
1950 };
1951 let err = execute(&cli).await.unwrap_err();
1952 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1953 }
1954
1955 #[tokio::test]
1956 async fn test_execute_logs_error_response() {
1957 use axum::{Router, http::StatusCode, routing::get};
1958
1959 let app = Router::new().route(
1960 "/api/v1/sessions/{id}/output",
1961 get(|| async {
1962 (
1963 StatusCode::NOT_FOUND,
1964 "{\"error\":\"session not found: ghost\"}",
1965 )
1966 }),
1967 );
1968 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1969 let addr = listener.local_addr().unwrap();
1970 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1971 let node = format!("127.0.0.1:{}", addr.port());
1972
1973 let cli = Cli {
1974 node,
1975 token: None,
1976 command: Commands::Logs {
1977 name: "ghost".into(),
1978 lines: 50,
1979 follow: false,
1980 },
1981 };
1982 let err = execute(&cli).await.unwrap_err();
1983 assert_eq!(err.to_string(), "session not found: ghost");
1984 }
1985
1986 #[tokio::test]
1987 async fn test_execute_resume_error_response() {
1988 use axum::{Router, http::StatusCode, routing::post};
1989
1990 let app = Router::new().route(
1991 "/api/v1/sessions/{id}/resume",
1992 post(|| async {
1993 (
1994 StatusCode::BAD_REQUEST,
1995 "{\"error\":\"session is not lost (status: active)\"}",
1996 )
1997 }),
1998 );
1999 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2000 let addr = listener.local_addr().unwrap();
2001 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2002 let node = format!("127.0.0.1:{}", addr.port());
2003
2004 let cli = Cli {
2005 node,
2006 token: None,
2007 command: Commands::Resume {
2008 name: "test-session".into(),
2009 },
2010 };
2011 let err = execute(&cli).await.unwrap_err();
2012 assert_eq!(err.to_string(), "session is not lost (status: active)");
2013 }
2014
2015 #[tokio::test]
2016 async fn test_execute_spawn_error_response() {
2017 use axum::{Router, http::StatusCode, routing::post};
2018
2019 let app = Router::new().route(
2020 "/api/v1/sessions",
2021 post(|| async {
2022 (
2023 StatusCode::INTERNAL_SERVER_ERROR,
2024 "{\"error\":\"failed to spawn session\"}",
2025 )
2026 }),
2027 );
2028 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2029 let addr = listener.local_addr().unwrap();
2030 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2031 let node = format!("127.0.0.1:{}", addr.port());
2032
2033 let cli = Cli {
2034 node,
2035 token: None,
2036 command: Commands::Spawn {
2037 workdir: Some("/tmp/repo".into()),
2038 name: None,
2039 provider: Some("claude".into()),
2040 auto: false,
2041 unrestricted: false,
2042 model: None,
2043 system_prompt: None,
2044 allowed_tools: None,
2045 ink: None,
2046 max_turns: None,
2047 max_budget: None,
2048 output_format: None,
2049 worktree: false,
2050 conversation_id: None,
2051 prompt: vec!["test".into()],
2052 },
2053 };
2054 let err = execute(&cli).await.unwrap_err();
2055 assert_eq!(err.to_string(), "failed to spawn session");
2056 }
2057
2058 #[tokio::test]
2059 async fn test_execute_interventions_error_response() {
2060 use axum::{Router, http::StatusCode, routing::get};
2061
2062 let app = Router::new().route(
2063 "/api/v1/sessions/{id}/interventions",
2064 get(|| async {
2065 (
2066 StatusCode::NOT_FOUND,
2067 "{\"error\":\"session not found: ghost\"}",
2068 )
2069 }),
2070 );
2071 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2072 let addr = listener.local_addr().unwrap();
2073 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2074 let node = format!("127.0.0.1:{}", addr.port());
2075
2076 let cli = Cli {
2077 node,
2078 token: None,
2079 command: Commands::Interventions {
2080 name: "ghost".into(),
2081 },
2082 };
2083 let err = execute(&cli).await.unwrap_err();
2084 assert_eq!(err.to_string(), "session not found: ghost");
2085 }
2086
2087 #[tokio::test]
2088 async fn test_execute_resume_success() {
2089 let node = start_test_server().await;
2090 let cli = Cli {
2091 node,
2092 token: None,
2093 command: Commands::Resume {
2094 name: "test-session".into(),
2095 },
2096 };
2097 let result = execute(&cli).await.unwrap();
2098 assert!(result.contains("Resumed session"));
2099 assert!(result.contains("repo"));
2100 }
2101
2102 #[tokio::test]
2103 async fn test_execute_input_success() {
2104 let node = start_test_server().await;
2105 let cli = Cli {
2106 node,
2107 token: None,
2108 command: Commands::Input {
2109 name: "test-session".into(),
2110 text: Some("yes".into()),
2111 },
2112 };
2113 let result = execute(&cli).await.unwrap();
2114 assert!(result.contains("Sent input to session test-session"));
2115 }
2116
2117 #[tokio::test]
2118 async fn test_execute_input_no_text() {
2119 let node = start_test_server().await;
2120 let cli = Cli {
2121 node,
2122 token: None,
2123 command: Commands::Input {
2124 name: "test-session".into(),
2125 text: None,
2126 },
2127 };
2128 let result = execute(&cli).await.unwrap();
2129 assert!(result.contains("Sent input to session test-session"));
2130 }
2131
2132 #[tokio::test]
2133 async fn test_execute_input_connection_refused() {
2134 let cli = Cli {
2135 node: "localhost:1".into(),
2136 token: None,
2137 command: Commands::Input {
2138 name: "test".into(),
2139 text: Some("y".into()),
2140 },
2141 };
2142 let result = execute(&cli).await;
2143 let err = result.unwrap_err().to_string();
2144 assert!(err.contains("Could not connect to pulpod"));
2145 }
2146
2147 #[tokio::test]
2148 async fn test_execute_input_error_response() {
2149 use axum::{Router, http::StatusCode, routing::post};
2150
2151 let app = Router::new().route(
2152 "/api/v1/sessions/{id}/input",
2153 post(|| async {
2154 (
2155 StatusCode::NOT_FOUND,
2156 "{\"error\":\"session not found: ghost\"}",
2157 )
2158 }),
2159 );
2160 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2161 let addr = listener.local_addr().unwrap();
2162 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2163 let node = format!("127.0.0.1:{}", addr.port());
2164
2165 let cli = Cli {
2166 node,
2167 token: None,
2168 command: Commands::Input {
2169 name: "ghost".into(),
2170 text: Some("y".into()),
2171 },
2172 };
2173 let err = execute(&cli).await.unwrap_err();
2174 assert_eq!(err.to_string(), "session not found: ghost");
2175 }
2176
2177 #[tokio::test]
2178 async fn test_execute_ui() {
2179 let cli = Cli {
2180 node: "localhost:7433".into(),
2181 token: None,
2182 command: Commands::Ui,
2183 };
2184 let result = execute(&cli).await.unwrap();
2185 assert!(result.contains("Opening"));
2186 assert!(result.contains("http://localhost:7433"));
2187 }
2188
2189 #[tokio::test]
2190 async fn test_execute_ui_custom_node() {
2191 let cli = Cli {
2192 node: "mac-mini:7433".into(),
2193 token: None,
2194 command: Commands::Ui,
2195 };
2196 let result = execute(&cli).await.unwrap();
2197 assert!(result.contains("http://mac-mini:7433"));
2198 }
2199
2200 #[test]
2201 fn test_format_sessions_empty() {
2202 assert_eq!(format_sessions(&[]), "No sessions.");
2203 }
2204
2205 #[test]
2206 fn test_format_sessions_with_data() {
2207 use chrono::Utc;
2208 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2209 use uuid::Uuid;
2210
2211 let sessions = vec![Session {
2212 id: Uuid::nil(),
2213 name: "my-api".into(),
2214 workdir: "/tmp/repo".into(),
2215 provider: Provider::Claude,
2216 prompt: "Fix the bug".into(),
2217 status: SessionStatus::Active,
2218 mode: SessionMode::Interactive,
2219 conversation_id: None,
2220 exit_code: None,
2221 backend_session_id: None,
2222 output_snapshot: None,
2223 guard_config: None,
2224 model: None,
2225 allowed_tools: None,
2226 system_prompt: None,
2227 metadata: None,
2228 ink: None,
2229 max_turns: None,
2230 max_budget_usd: None,
2231 output_format: None,
2232 intervention_code: None,
2233 intervention_reason: None,
2234 intervention_at: None,
2235 last_output_at: None,
2236 idle_since: None,
2237 created_at: Utc::now(),
2238 updated_at: Utc::now(),
2239 }];
2240 let output = format_sessions(&sessions);
2241 assert!(output.contains("NAME"));
2242 assert!(output.contains("my-api"));
2243 assert!(output.contains("active"));
2244 assert!(output.contains("claude"));
2245 assert!(output.contains("Fix the bug"));
2246 }
2247
2248 #[test]
2249 fn test_format_sessions_long_prompt_truncated() {
2250 use chrono::Utc;
2251 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2252 use uuid::Uuid;
2253
2254 let sessions = vec![Session {
2255 id: Uuid::nil(),
2256 name: "test".into(),
2257 workdir: "/tmp".into(),
2258 provider: Provider::Codex,
2259 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2260 status: SessionStatus::Finished,
2261 mode: SessionMode::Autonomous,
2262 conversation_id: None,
2263 exit_code: None,
2264 backend_session_id: None,
2265 output_snapshot: None,
2266 guard_config: None,
2267 model: None,
2268 allowed_tools: None,
2269 system_prompt: None,
2270 metadata: None,
2271 ink: None,
2272 max_turns: None,
2273 max_budget_usd: None,
2274 output_format: None,
2275 intervention_code: None,
2276 intervention_reason: None,
2277 intervention_at: None,
2278 last_output_at: None,
2279 idle_since: None,
2280 created_at: Utc::now(),
2281 updated_at: Utc::now(),
2282 }];
2283 let output = format_sessions(&sessions);
2284 assert!(output.contains("..."));
2285 }
2286
2287 #[test]
2288 fn test_format_nodes() {
2289 use pulpo_common::node::NodeInfo;
2290 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2291
2292 let resp = PeersResponse {
2293 local: NodeInfo {
2294 name: "mac-mini".into(),
2295 hostname: "h".into(),
2296 os: "macos".into(),
2297 arch: "arm64".into(),
2298 cpus: 8,
2299 memory_mb: 16384,
2300 gpu: None,
2301 },
2302 peers: vec![PeerInfo {
2303 name: "win-pc".into(),
2304 address: "win-pc:7433".into(),
2305 status: PeerStatus::Online,
2306 node_info: None,
2307 session_count: Some(3),
2308 source: PeerSource::Configured,
2309 }],
2310 };
2311 let output = format_nodes(&resp);
2312 assert!(output.contains("mac-mini"));
2313 assert!(output.contains("(local)"));
2314 assert!(output.contains("win-pc"));
2315 assert!(output.contains('3'));
2316 }
2317
2318 #[test]
2319 fn test_format_nodes_no_session_count() {
2320 use pulpo_common::node::NodeInfo;
2321 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2322
2323 let resp = PeersResponse {
2324 local: NodeInfo {
2325 name: "local".into(),
2326 hostname: "h".into(),
2327 os: "linux".into(),
2328 arch: "x86_64".into(),
2329 cpus: 4,
2330 memory_mb: 8192,
2331 gpu: None,
2332 },
2333 peers: vec![PeerInfo {
2334 name: "peer".into(),
2335 address: "peer:7433".into(),
2336 status: PeerStatus::Offline,
2337 node_info: None,
2338 session_count: None,
2339 source: PeerSource::Configured,
2340 }],
2341 };
2342 let output = format_nodes(&resp);
2343 assert!(output.contains("offline"));
2344 let lines: Vec<&str> = output.lines().collect();
2346 assert!(lines[2].contains('-'));
2347 }
2348
2349 #[tokio::test]
2350 async fn test_execute_resume_connection_refused() {
2351 let cli = Cli {
2352 node: "localhost:1".into(),
2353 token: None,
2354 command: Commands::Resume {
2355 name: "test".into(),
2356 },
2357 };
2358 let result = execute(&cli).await;
2359 let err = result.unwrap_err().to_string();
2360 assert!(err.contains("Could not connect to pulpod"));
2361 }
2362
2363 #[tokio::test]
2364 async fn test_execute_spawn_connection_refused() {
2365 let cli = Cli {
2366 node: "localhost:1".into(),
2367 token: None,
2368 command: Commands::Spawn {
2369 workdir: Some("/tmp".into()),
2370 name: None,
2371 provider: Some("claude".into()),
2372 auto: false,
2373 unrestricted: false,
2374 model: None,
2375 system_prompt: None,
2376 allowed_tools: None,
2377 ink: None,
2378 max_turns: None,
2379 max_budget: None,
2380 output_format: None,
2381 worktree: false,
2382 conversation_id: None,
2383 prompt: vec!["test".into()],
2384 },
2385 };
2386 let result = execute(&cli).await;
2387 let err = result.unwrap_err().to_string();
2388 assert!(err.contains("Could not connect to pulpod"));
2389 }
2390
2391 #[tokio::test]
2392 async fn test_execute_kill_connection_refused() {
2393 let cli = Cli {
2394 node: "localhost:1".into(),
2395 token: None,
2396 command: Commands::Kill {
2397 name: "test".into(),
2398 },
2399 };
2400 let result = execute(&cli).await;
2401 let err = result.unwrap_err().to_string();
2402 assert!(err.contains("Could not connect to pulpod"));
2403 }
2404
2405 #[tokio::test]
2406 async fn test_execute_delete_connection_refused() {
2407 let cli = Cli {
2408 node: "localhost:1".into(),
2409 token: None,
2410 command: Commands::Delete {
2411 name: "test".into(),
2412 },
2413 };
2414 let result = execute(&cli).await;
2415 let err = result.unwrap_err().to_string();
2416 assert!(err.contains("Could not connect to pulpod"));
2417 }
2418
2419 #[tokio::test]
2420 async fn test_execute_logs_connection_refused() {
2421 let cli = Cli {
2422 node: "localhost:1".into(),
2423 token: None,
2424 command: Commands::Logs {
2425 name: "test".into(),
2426 lines: 50,
2427 follow: false,
2428 },
2429 };
2430 let result = execute(&cli).await;
2431 let err = result.unwrap_err().to_string();
2432 assert!(err.contains("Could not connect to pulpod"));
2433 }
2434
2435 #[tokio::test]
2436 async fn test_friendly_error_connect() {
2437 let err = reqwest::Client::new()
2439 .get("http://127.0.0.1:1")
2440 .send()
2441 .await
2442 .unwrap_err();
2443 let friendly = friendly_error(&err, "test-node:1");
2444 let msg = friendly.to_string();
2445 assert!(
2446 msg.contains("Could not connect"),
2447 "Expected connect message, got: {msg}"
2448 );
2449 }
2450
2451 #[tokio::test]
2452 async fn test_friendly_error_other() {
2453 let err = reqwest::Client::new()
2455 .get("http://[::invalid::url")
2456 .send()
2457 .await
2458 .unwrap_err();
2459 let friendly = friendly_error(&err, "bad-host");
2460 let msg = friendly.to_string();
2461 assert!(
2462 msg.contains("Network error"),
2463 "Expected network error message, got: {msg}"
2464 );
2465 assert!(msg.contains("bad-host"));
2466 }
2467
2468 #[test]
2471 fn test_is_localhost_variants() {
2472 assert!(is_localhost("localhost:7433"));
2473 assert!(is_localhost("127.0.0.1:7433"));
2474 assert!(is_localhost("[::1]:7433"));
2475 assert!(is_localhost("::1"));
2476 assert!(is_localhost("localhost"));
2477 assert!(!is_localhost("mac-mini:7433"));
2478 assert!(!is_localhost("192.168.1.100:7433"));
2479 }
2480
2481 #[test]
2482 fn test_authed_get_with_token() {
2483 let client = reqwest::Client::new();
2484 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2485 .build()
2486 .unwrap();
2487 let auth = req
2488 .headers()
2489 .get("authorization")
2490 .unwrap()
2491 .to_str()
2492 .unwrap();
2493 assert_eq!(auth, "Bearer tok");
2494 }
2495
2496 #[test]
2497 fn test_authed_get_without_token() {
2498 let client = reqwest::Client::new();
2499 let req = authed_get(&client, "http://h:1/api".into(), None)
2500 .build()
2501 .unwrap();
2502 assert!(req.headers().get("authorization").is_none());
2503 }
2504
2505 #[test]
2506 fn test_authed_post_with_token() {
2507 let client = reqwest::Client::new();
2508 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2509 .build()
2510 .unwrap();
2511 let auth = req
2512 .headers()
2513 .get("authorization")
2514 .unwrap()
2515 .to_str()
2516 .unwrap();
2517 assert_eq!(auth, "Bearer secret");
2518 }
2519
2520 #[test]
2521 fn test_authed_post_without_token() {
2522 let client = reqwest::Client::new();
2523 let req = authed_post(&client, "http://h:1/api".into(), None)
2524 .build()
2525 .unwrap();
2526 assert!(req.headers().get("authorization").is_none());
2527 }
2528
2529 #[test]
2530 fn test_authed_delete_with_token() {
2531 let client = reqwest::Client::new();
2532 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2533 .build()
2534 .unwrap();
2535 let auth = req
2536 .headers()
2537 .get("authorization")
2538 .unwrap()
2539 .to_str()
2540 .unwrap();
2541 assert_eq!(auth, "Bearer del-tok");
2542 }
2543
2544 #[test]
2545 fn test_authed_delete_without_token() {
2546 let client = reqwest::Client::new();
2547 let req = authed_delete(&client, "http://h:1/api".into(), None)
2548 .build()
2549 .unwrap();
2550 assert!(req.headers().get("authorization").is_none());
2551 }
2552
2553 #[tokio::test]
2554 async fn test_resolve_token_explicit() {
2555 let client = reqwest::Client::new();
2556 let token =
2557 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2558 assert_eq!(token, Some("my-tok".into()));
2559 }
2560
2561 #[tokio::test]
2562 async fn test_resolve_token_remote_no_explicit() {
2563 let client = reqwest::Client::new();
2564 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2565 assert_eq!(token, None);
2566 }
2567
2568 #[tokio::test]
2569 async fn test_resolve_token_localhost_auto_discover() {
2570 use axum::{Json, Router, routing::get};
2571
2572 let app = Router::new().route(
2573 "/api/v1/auth/token",
2574 get(|| async {
2575 Json(AuthTokenResponse {
2576 token: "discovered".into(),
2577 })
2578 }),
2579 );
2580 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2581 let addr = listener.local_addr().unwrap();
2582 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2583
2584 let node = format!("localhost:{}", addr.port());
2585 let base = base_url(&node);
2586 let client = reqwest::Client::new();
2587 let token = resolve_token(&client, &base, &node, None).await;
2588 assert_eq!(token, Some("discovered".into()));
2589 }
2590
2591 #[tokio::test]
2592 async fn test_discover_token_empty_returns_none() {
2593 use axum::{Json, Router, routing::get};
2594
2595 let app = Router::new().route(
2596 "/api/v1/auth/token",
2597 get(|| async {
2598 Json(AuthTokenResponse {
2599 token: String::new(),
2600 })
2601 }),
2602 );
2603 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2604 let addr = listener.local_addr().unwrap();
2605 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2606
2607 let base = format!("http://127.0.0.1:{}", addr.port());
2608 let client = reqwest::Client::new();
2609 assert_eq!(discover_token(&client, &base).await, None);
2610 }
2611
2612 #[tokio::test]
2613 async fn test_discover_token_unreachable_returns_none() {
2614 let client = reqwest::Client::new();
2615 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2616 }
2617
2618 #[test]
2619 fn test_cli_parse_with_token() {
2620 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2621 assert_eq!(cli.token, Some("my-secret".into()));
2622 }
2623
2624 #[test]
2625 fn test_cli_parse_without_token() {
2626 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2627 assert_eq!(cli.token, None);
2628 }
2629
2630 #[tokio::test]
2631 async fn test_execute_with_explicit_token_sends_header() {
2632 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2633
2634 let app = Router::new().route(
2635 "/api/v1/sessions",
2636 get(|req: Request| async move {
2637 let auth = req
2638 .headers()
2639 .get("authorization")
2640 .and_then(|v| v.to_str().ok())
2641 .unwrap_or("");
2642 assert_eq!(auth, "Bearer test-token");
2643 (StatusCode::OK, "[]".to_owned())
2644 }),
2645 );
2646 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2647 let addr = listener.local_addr().unwrap();
2648 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2649 let node = format!("127.0.0.1:{}", addr.port());
2650
2651 let cli = Cli {
2652 node,
2653 token: Some("test-token".into()),
2654 command: Commands::List,
2655 };
2656 let result = execute(&cli).await.unwrap();
2657 assert_eq!(result, "No sessions.");
2658 }
2659
2660 #[test]
2663 fn test_cli_parse_interventions() {
2664 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2665 assert!(matches!(
2666 &cli.command,
2667 Commands::Interventions { name } if name == "my-session"
2668 ));
2669 }
2670
2671 #[test]
2672 fn test_format_interventions_empty() {
2673 assert_eq!(format_interventions(&[]), "No intervention events.");
2674 }
2675
2676 #[test]
2677 fn test_format_interventions_with_data() {
2678 let events = vec![
2679 InterventionEventResponse {
2680 id: 1,
2681 session_id: "sess-1".into(),
2682 code: None,
2683 reason: "Memory exceeded threshold".into(),
2684 created_at: "2026-01-01T00:00:00Z".into(),
2685 },
2686 InterventionEventResponse {
2687 id: 2,
2688 session_id: "sess-1".into(),
2689 code: None,
2690 reason: "Idle for 10 minutes".into(),
2691 created_at: "2026-01-02T00:00:00Z".into(),
2692 },
2693 ];
2694 let output = format_interventions(&events);
2695 assert!(output.contains("ID"));
2696 assert!(output.contains("TIMESTAMP"));
2697 assert!(output.contains("REASON"));
2698 assert!(output.contains("Memory exceeded threshold"));
2699 assert!(output.contains("Idle for 10 minutes"));
2700 assert!(output.contains("2026-01-01T00:00:00Z"));
2701 }
2702
2703 #[tokio::test]
2704 async fn test_execute_interventions_empty() {
2705 let node = start_test_server().await;
2706 let cli = Cli {
2707 node,
2708 token: None,
2709 command: Commands::Interventions {
2710 name: "my-session".into(),
2711 },
2712 };
2713 let result = execute(&cli).await.unwrap();
2714 assert_eq!(result, "No intervention events.");
2715 }
2716
2717 #[tokio::test]
2718 async fn test_execute_interventions_with_data() {
2719 use axum::{Router, routing::get};
2720
2721 let app = Router::new().route(
2722 "/api/v1/sessions/{id}/interventions",
2723 get(|| async {
2724 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2725 .to_owned()
2726 }),
2727 );
2728 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2729 let addr = listener.local_addr().unwrap();
2730 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2731 let node = format!("127.0.0.1:{}", addr.port());
2732
2733 let cli = Cli {
2734 node,
2735 token: None,
2736 command: Commands::Interventions {
2737 name: "test".into(),
2738 },
2739 };
2740 let result = execute(&cli).await.unwrap();
2741 assert!(result.contains("OOM"));
2742 assert!(result.contains("2026-01-01T00:00:00Z"));
2743 }
2744
2745 #[tokio::test]
2746 async fn test_execute_interventions_connection_refused() {
2747 let cli = Cli {
2748 node: "localhost:1".into(),
2749 token: None,
2750 command: Commands::Interventions {
2751 name: "test".into(),
2752 },
2753 };
2754 let result = execute(&cli).await;
2755 let err = result.unwrap_err().to_string();
2756 assert!(err.contains("Could not connect to pulpod"));
2757 }
2758
2759 #[test]
2762 fn test_build_attach_command() {
2763 let cmd = build_attach_command("my-session");
2764 assert_eq!(cmd.get_program(), "tmux");
2765 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2766 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2767 }
2768
2769 #[test]
2770 fn test_cli_parse_attach() {
2771 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2772 assert!(matches!(
2773 &cli.command,
2774 Commands::Attach { name } if name == "my-session"
2775 ));
2776 }
2777
2778 #[test]
2779 fn test_cli_parse_attach_alias() {
2780 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2781 assert!(matches!(
2782 &cli.command,
2783 Commands::Attach { name } if name == "my-session"
2784 ));
2785 }
2786
2787 #[tokio::test]
2788 async fn test_execute_attach_success() {
2789 let node = start_test_server().await;
2790 let cli = Cli {
2791 node,
2792 token: None,
2793 command: Commands::Attach {
2794 name: "test-session".into(),
2795 },
2796 };
2797 let result = execute(&cli).await.unwrap();
2798 assert!(result.contains("Detached from session test-session"));
2799 }
2800
2801 #[tokio::test]
2802 async fn test_execute_attach_with_backend_session_id() {
2803 use axum::{Router, routing::get};
2804 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"active","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2805 let app = Router::new().route(
2806 "/api/v1/sessions/{id}",
2807 get(move || async move { session_json.to_owned() }),
2808 );
2809 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2810 let addr = listener.local_addr().unwrap();
2811 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2812
2813 let cli = Cli {
2814 node: format!("127.0.0.1:{}", addr.port()),
2815 token: None,
2816 command: Commands::Attach {
2817 name: "my-session".into(),
2818 },
2819 };
2820 let result = execute(&cli).await.unwrap();
2821 assert!(result.contains("Detached from session my-session"));
2822 }
2823
2824 #[tokio::test]
2825 async fn test_execute_attach_connection_refused() {
2826 let cli = Cli {
2827 node: "localhost:1".into(),
2828 token: None,
2829 command: Commands::Attach {
2830 name: "test-session".into(),
2831 },
2832 };
2833 let result = execute(&cli).await;
2834 let err = result.unwrap_err().to_string();
2835 assert!(err.contains("Could not connect to pulpod"));
2836 }
2837
2838 #[tokio::test]
2839 async fn test_execute_attach_error_response() {
2840 use axum::{Router, http::StatusCode, routing::get};
2841 let app = Router::new().route(
2842 "/api/v1/sessions/{id}",
2843 get(|| async {
2844 (
2845 StatusCode::NOT_FOUND,
2846 r#"{"error":"session not found"}"#.to_owned(),
2847 )
2848 }),
2849 );
2850 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2851 let addr = listener.local_addr().unwrap();
2852 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2853
2854 let cli = Cli {
2855 node: format!("127.0.0.1:{}", addr.port()),
2856 token: None,
2857 command: Commands::Attach {
2858 name: "nonexistent".into(),
2859 },
2860 };
2861 let result = execute(&cli).await;
2862 let err = result.unwrap_err().to_string();
2863 assert!(err.contains("session not found"));
2864 }
2865
2866 #[tokio::test]
2867 async fn test_execute_attach_stale_session() {
2868 use axum::{Router, routing::get};
2869 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"lost","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2870 let app = Router::new().route(
2871 "/api/v1/sessions/{id}",
2872 get(move || async move { session_json.to_owned() }),
2873 );
2874 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2875 let addr = listener.local_addr().unwrap();
2876 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2877
2878 let cli = Cli {
2879 node: format!("127.0.0.1:{}", addr.port()),
2880 token: None,
2881 command: Commands::Attach {
2882 name: "stale-sess".into(),
2883 },
2884 };
2885 let result = execute(&cli).await;
2886 let err = result.unwrap_err().to_string();
2887 assert!(err.contains("lost"));
2888 assert!(err.contains("pulpo resume"));
2889 }
2890
2891 #[tokio::test]
2892 async fn test_execute_attach_dead_session() {
2893 use axum::{Router, routing::get};
2894 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"killed","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2895 let app = Router::new().route(
2896 "/api/v1/sessions/{id}",
2897 get(move || async move { session_json.to_owned() }),
2898 );
2899 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2900 let addr = listener.local_addr().unwrap();
2901 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2902
2903 let cli = Cli {
2904 node: format!("127.0.0.1:{}", addr.port()),
2905 token: None,
2906 command: Commands::Attach {
2907 name: "dead-sess".into(),
2908 },
2909 };
2910 let result = execute(&cli).await;
2911 let err = result.unwrap_err().to_string();
2912 assert!(err.contains("killed"));
2913 assert!(err.contains("cannot attach"));
2914 }
2915
2916 #[test]
2919 fn test_cli_parse_alias_spawn() {
2920 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2921 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2922 }
2923
2924 #[test]
2925 fn test_cli_parse_alias_list() {
2926 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2927 assert!(matches!(cli.command, Commands::List));
2928 }
2929
2930 #[test]
2931 fn test_cli_parse_alias_logs() {
2932 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2933 assert!(matches!(
2934 &cli.command,
2935 Commands::Logs { name, .. } if name == "my-session"
2936 ));
2937 }
2938
2939 #[test]
2940 fn test_cli_parse_alias_kill() {
2941 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2942 assert!(matches!(
2943 &cli.command,
2944 Commands::Kill { name } if name == "my-session"
2945 ));
2946 }
2947
2948 #[test]
2949 fn test_cli_parse_alias_delete() {
2950 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2951 assert!(matches!(
2952 &cli.command,
2953 Commands::Delete { name } if name == "my-session"
2954 ));
2955 }
2956
2957 #[test]
2958 fn test_cli_parse_alias_resume() {
2959 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2960 assert!(matches!(
2961 &cli.command,
2962 Commands::Resume { name } if name == "my-session"
2963 ));
2964 }
2965
2966 #[test]
2967 fn test_cli_parse_alias_nodes() {
2968 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2969 assert!(matches!(cli.command, Commands::Nodes));
2970 }
2971
2972 #[test]
2973 fn test_cli_parse_alias_interventions() {
2974 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2975 assert!(matches!(
2976 &cli.command,
2977 Commands::Interventions { name } if name == "my-session"
2978 ));
2979 }
2980
2981 #[test]
2982 fn test_api_error_json() {
2983 let err = api_error("{\"error\":\"session not found: foo\"}");
2984 assert_eq!(err.to_string(), "session not found: foo");
2985 }
2986
2987 #[test]
2988 fn test_api_error_plain_text() {
2989 let err = api_error("plain text error");
2990 assert_eq!(err.to_string(), "plain text error");
2991 }
2992
2993 #[test]
2996 fn test_diff_output_empty_prev() {
2997 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2998 }
2999
3000 #[test]
3001 fn test_diff_output_identical() {
3002 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3003 }
3004
3005 #[test]
3006 fn test_diff_output_new_lines_appended() {
3007 let prev = "line1\nline2";
3008 let new = "line1\nline2\nline3\nline4";
3009 assert_eq!(diff_output(prev, new), "line3\nline4");
3010 }
3011
3012 #[test]
3013 fn test_diff_output_scrolled_window() {
3014 let prev = "line1\nline2\nline3";
3016 let new = "line2\nline3\nline4";
3017 assert_eq!(diff_output(prev, new), "line4");
3018 }
3019
3020 #[test]
3021 fn test_diff_output_completely_different() {
3022 let prev = "aaa\nbbb";
3023 let new = "xxx\nyyy";
3024 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3025 }
3026
3027 #[test]
3028 fn test_diff_output_last_line_matches_but_overlap_fails() {
3029 let prev = "aaa\ncommon";
3031 let new = "zzz\ncommon\nnew_line";
3032 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3036 }
3037
3038 #[test]
3039 fn test_diff_output_new_empty() {
3040 assert_eq!(diff_output("line1", ""), "");
3041 }
3042
3043 async fn start_follow_test_server() -> String {
3047 use axum::{Router, extract::Path, extract::Query, routing::get};
3048 use std::sync::Arc;
3049 use std::sync::atomic::{AtomicUsize, Ordering};
3050
3051 let call_count = Arc::new(AtomicUsize::new(0));
3052 let output_count = call_count.clone();
3053 let status_count = Arc::new(AtomicUsize::new(0));
3054 let status_count_inner = status_count.clone();
3055
3056 let app = Router::new()
3057 .route(
3058 "/api/v1/sessions/{id}/output",
3059 get(
3060 move |_path: Path<String>,
3061 _query: Query<std::collections::HashMap<String, String>>| {
3062 let count = output_count.clone();
3063 async move {
3064 let n = count.fetch_add(1, Ordering::SeqCst);
3065 let output = match n {
3066 0 => "line1\nline2".to_owned(),
3067 1 => "line1\nline2\nline3".to_owned(),
3068 _ => "line2\nline3\nline4".to_owned(),
3069 };
3070 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3071 }
3072 },
3073 ),
3074 )
3075 .route(
3076 "/api/v1/sessions/{id}",
3077 get(move |_path: Path<String>| {
3078 let count = status_count_inner.clone();
3079 async move {
3080 let n = count.fetch_add(1, Ordering::SeqCst);
3081 let status = if n < 2 { "active" } else { "finished" };
3082 format!(
3083 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
3084 )
3085 }
3086 }),
3087 );
3088
3089 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3090 let addr = listener.local_addr().unwrap();
3091 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3092 format!("http://127.0.0.1:{}", addr.port())
3093 }
3094
3095 #[tokio::test]
3096 async fn test_follow_logs_polls_and_exits_on_completed() {
3097 let base = start_follow_test_server().await;
3098 let client = reqwest::Client::new();
3099 let mut buf = Vec::new();
3100
3101 follow_logs(&client, &base, "test", 100, None, &mut buf)
3102 .await
3103 .unwrap();
3104
3105 let output = String::from_utf8(buf).unwrap();
3106 assert!(output.contains("line1"));
3108 assert!(output.contains("line2"));
3109 assert!(output.contains("line3"));
3110 assert!(output.contains("line4"));
3111 }
3112
3113 #[tokio::test]
3114 async fn test_execute_logs_follow_success() {
3115 let base = start_follow_test_server().await;
3116 let node = base.strip_prefix("http://").unwrap().to_owned();
3118
3119 let cli = Cli {
3120 node,
3121 token: None,
3122 command: Commands::Logs {
3123 name: "test".into(),
3124 lines: 100,
3125 follow: true,
3126 },
3127 };
3128 let result = execute(&cli).await.unwrap();
3130 assert_eq!(result, "");
3131 }
3132
3133 #[tokio::test]
3134 async fn test_execute_logs_follow_connection_refused() {
3135 let cli = Cli {
3136 node: "localhost:1".into(),
3137 token: None,
3138 command: Commands::Logs {
3139 name: "test".into(),
3140 lines: 50,
3141 follow: true,
3142 },
3143 };
3144 let result = execute(&cli).await;
3145 let err = result.unwrap_err().to_string();
3146 assert!(
3147 err.contains("Could not connect to pulpod"),
3148 "Expected friendly error, got: {err}"
3149 );
3150 }
3151
3152 #[tokio::test]
3153 async fn test_follow_logs_exits_on_dead() {
3154 use axum::{Router, extract::Path, extract::Query, routing::get};
3155
3156 let app = Router::new()
3157 .route(
3158 "/api/v1/sessions/{id}/output",
3159 get(
3160 |_path: Path<String>,
3161 _query: Query<std::collections::HashMap<String, String>>| async {
3162 r#"{"output":"some output"}"#.to_owned()
3163 },
3164 ),
3165 )
3166 .route(
3167 "/api/v1/sessions/{id}",
3168 get(|_path: Path<String>| async {
3169 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"killed","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3170 }),
3171 );
3172
3173 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3174 let addr = listener.local_addr().unwrap();
3175 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3176 let base = format!("http://127.0.0.1:{}", addr.port());
3177
3178 let client = reqwest::Client::new();
3179 let mut buf = Vec::new();
3180 follow_logs(&client, &base, "test", 100, None, &mut buf)
3181 .await
3182 .unwrap();
3183
3184 let output = String::from_utf8(buf).unwrap();
3185 assert!(output.contains("some output"));
3186 }
3187
3188 #[tokio::test]
3189 async fn test_follow_logs_exits_on_stale() {
3190 use axum::{Router, extract::Path, extract::Query, routing::get};
3191
3192 let app = Router::new()
3193 .route(
3194 "/api/v1/sessions/{id}/output",
3195 get(
3196 |_path: Path<String>,
3197 _query: Query<std::collections::HashMap<String, String>>| async {
3198 r#"{"output":"stale output"}"#.to_owned()
3199 },
3200 ),
3201 )
3202 .route(
3203 "/api/v1/sessions/{id}",
3204 get(|_path: Path<String>| async {
3205 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"lost","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3206 }),
3207 );
3208
3209 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3210 let addr = listener.local_addr().unwrap();
3211 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3212 let base = format!("http://127.0.0.1:{}", addr.port());
3213
3214 let client = reqwest::Client::new();
3215 let mut buf = Vec::new();
3216 follow_logs(&client, &base, "test", 100, None, &mut buf)
3217 .await
3218 .unwrap();
3219
3220 let output = String::from_utf8(buf).unwrap();
3221 assert!(output.contains("stale output"));
3222 }
3223
3224 #[tokio::test]
3225 async fn test_execute_logs_follow_non_reqwest_error() {
3226 use axum::{Router, extract::Path, extract::Query, routing::get};
3227
3228 let app = Router::new()
3230 .route(
3231 "/api/v1/sessions/{id}/output",
3232 get(
3233 |_path: Path<String>,
3234 _query: Query<std::collections::HashMap<String, String>>| async {
3235 r#"{"output":"initial"}"#.to_owned()
3236 },
3237 ),
3238 )
3239 .route(
3240 "/api/v1/sessions/{id}",
3241 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3242 );
3243
3244 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3245 let addr = listener.local_addr().unwrap();
3246 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3247 let node = format!("127.0.0.1:{}", addr.port());
3248
3249 let cli = Cli {
3250 node,
3251 token: None,
3252 command: Commands::Logs {
3253 name: "test".into(),
3254 lines: 100,
3255 follow: true,
3256 },
3257 };
3258 let err = execute(&cli).await.unwrap_err();
3259 let msg = err.to_string();
3261 assert!(
3262 msg.contains("expected ident"),
3263 "Expected serde parse error, got: {msg}"
3264 );
3265 }
3266
3267 #[test]
3268 fn test_cli_parse_spawn_with_guardrails() {
3269 let cli = Cli::try_parse_from([
3270 "pulpo",
3271 "spawn",
3272 "--workdir",
3273 "/tmp",
3274 "--max-turns",
3275 "10",
3276 "--max-budget",
3277 "5.5",
3278 "--output-format",
3279 "json",
3280 "Do it",
3281 ])
3282 .unwrap();
3283 assert!(matches!(
3284 &cli.command,
3285 Commands::Spawn { max_turns, max_budget, output_format, .. }
3286 if *max_turns == Some(10) && *max_budget == Some(5.5)
3287 && output_format.as_deref() == Some("json")
3288 ));
3289 }
3290
3291 #[tokio::test]
3292 async fn test_fetch_session_status_connection_error() {
3293 let client = reqwest::Client::new();
3294 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3295 assert!(result.is_err());
3296 }
3297
3298 #[test]
3301 fn test_build_crontab_line() {
3302 let line = build_crontab_line(
3303 "nightly-review",
3304 "0 3 * * *",
3305 "/home/me/repo",
3306 "claude",
3307 "Review PRs",
3308 "localhost:7433",
3309 );
3310 assert_eq!(
3311 line,
3312 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3313 );
3314 }
3315
3316 #[test]
3317 fn test_crontab_install_success() {
3318 let crontab = "# existing cron\n0 * * * * echo hi\n";
3319 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3320 let result = crontab_install(crontab, "my-job", line).unwrap();
3321 assert!(result.starts_with("# existing cron\n"));
3322 assert!(result.ends_with("#pulpo:my-job\n"));
3323 assert!(result.contains("echo hi"));
3324 }
3325
3326 #[test]
3327 fn test_crontab_install_duplicate_error() {
3328 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3329 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3330 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3331 assert!(err.to_string().contains("already exists"));
3332 }
3333
3334 #[test]
3335 fn test_crontab_list_empty() {
3336 assert_eq!(crontab_list(""), "No pulpo schedules.");
3337 }
3338
3339 #[test]
3340 fn test_crontab_list_no_pulpo_entries() {
3341 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3342 }
3343
3344 #[test]
3345 fn test_crontab_list_with_entries() {
3346 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3347 let output = crontab_list(crontab);
3348 assert!(output.contains("NAME"));
3349 assert!(output.contains("CRON"));
3350 assert!(output.contains("PAUSED"));
3351 assert!(output.contains("nightly"));
3352 assert!(output.contains("0 3 * * *"));
3353 assert!(output.contains("no"));
3354 }
3355
3356 #[test]
3357 fn test_crontab_list_paused_entry() {
3358 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3359 let output = crontab_list(crontab);
3360 assert!(output.contains("paused-job"));
3361 assert!(output.contains("yes"));
3362 }
3363
3364 #[test]
3365 fn test_crontab_list_short_line() {
3366 let crontab = "badcron #pulpo:broken\n";
3368 let output = crontab_list(crontab);
3369 assert!(output.contains("broken"));
3370 assert!(output.contains('?'));
3371 }
3372
3373 #[test]
3374 fn test_crontab_remove_success() {
3375 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3376 let result = crontab_remove(crontab, "my-job").unwrap();
3377 assert!(result.contains("echo hi"));
3378 assert!(!result.contains("my-job"));
3379 }
3380
3381 #[test]
3382 fn test_crontab_remove_not_found() {
3383 let crontab = "0 * * * * echo hi\n";
3384 let err = crontab_remove(crontab, "ghost").unwrap_err();
3385 assert!(err.to_string().contains("not found"));
3386 }
3387
3388 #[test]
3389 fn test_crontab_pause_success() {
3390 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3391 let result = crontab_pause(crontab, "my-job").unwrap();
3392 assert!(result.starts_with('#'));
3393 assert!(result.contains("#pulpo:my-job"));
3394 }
3395
3396 #[test]
3397 fn test_crontab_pause_not_found() {
3398 let crontab = "0 * * * * echo hi\n";
3399 let err = crontab_pause(crontab, "ghost").unwrap_err();
3400 assert!(err.to_string().contains("not found or already paused"));
3401 }
3402
3403 #[test]
3404 fn test_crontab_pause_already_paused() {
3405 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3406 let err = crontab_pause(crontab, "my-job").unwrap_err();
3407 assert!(err.to_string().contains("already paused"));
3408 }
3409
3410 #[test]
3411 fn test_crontab_resume_success() {
3412 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3413 let result = crontab_resume(crontab, "my-job").unwrap();
3414 assert!(!result.starts_with('#'));
3415 assert!(result.contains("#pulpo:my-job"));
3416 }
3417
3418 #[test]
3419 fn test_crontab_resume_not_found() {
3420 let crontab = "0 * * * * echo hi\n";
3421 let err = crontab_resume(crontab, "ghost").unwrap_err();
3422 assert!(err.to_string().contains("not found or not paused"));
3423 }
3424
3425 #[test]
3426 fn test_crontab_resume_not_paused() {
3427 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3428 let err = crontab_resume(crontab, "my-job").unwrap_err();
3429 assert!(err.to_string().contains("not paused"));
3430 }
3431
3432 #[test]
3435 fn test_cli_parse_schedule_install() {
3436 let cli = Cli::try_parse_from([
3437 "pulpo",
3438 "schedule",
3439 "install",
3440 "nightly",
3441 "0 3 * * *",
3442 "--workdir",
3443 "/tmp/repo",
3444 "Review",
3445 "PRs",
3446 ])
3447 .unwrap();
3448 assert!(matches!(
3449 &cli.command,
3450 Commands::Schedule {
3451 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3452 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3453 && provider == "claude" && prompt == &["Review", "PRs"]
3454 ));
3455 }
3456
3457 #[test]
3458 fn test_cli_parse_schedule_list() {
3459 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3460 assert!(matches!(
3461 &cli.command,
3462 Commands::Schedule {
3463 action: ScheduleAction::List
3464 }
3465 ));
3466 }
3467
3468 #[test]
3469 fn test_cli_parse_schedule_remove() {
3470 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3471 assert!(matches!(
3472 &cli.command,
3473 Commands::Schedule {
3474 action: ScheduleAction::Remove { name }
3475 } if name == "nightly"
3476 ));
3477 }
3478
3479 #[test]
3480 fn test_cli_parse_schedule_pause() {
3481 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3482 assert!(matches!(
3483 &cli.command,
3484 Commands::Schedule {
3485 action: ScheduleAction::Pause { name }
3486 } if name == "nightly"
3487 ));
3488 }
3489
3490 #[test]
3491 fn test_cli_parse_schedule_resume() {
3492 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3493 assert!(matches!(
3494 &cli.command,
3495 Commands::Schedule {
3496 action: ScheduleAction::Resume { name }
3497 } if name == "nightly"
3498 ));
3499 }
3500
3501 #[test]
3502 fn test_cli_parse_schedule_alias() {
3503 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3504 assert!(matches!(
3505 &cli.command,
3506 Commands::Schedule {
3507 action: ScheduleAction::List
3508 }
3509 ));
3510 }
3511
3512 #[test]
3513 fn test_cli_parse_schedule_list_alias() {
3514 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3515 assert!(matches!(
3516 &cli.command,
3517 Commands::Schedule {
3518 action: ScheduleAction::List
3519 }
3520 ));
3521 }
3522
3523 #[test]
3524 fn test_cli_parse_schedule_remove_alias() {
3525 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3526 assert!(matches!(
3527 &cli.command,
3528 Commands::Schedule {
3529 action: ScheduleAction::Remove { name }
3530 } if name == "nightly"
3531 ));
3532 }
3533
3534 #[test]
3535 fn test_cli_parse_schedule_install_custom_provider() {
3536 let cli = Cli::try_parse_from([
3537 "pulpo",
3538 "schedule",
3539 "install",
3540 "daily",
3541 "0 9 * * *",
3542 "--workdir",
3543 "/tmp",
3544 "--provider",
3545 "codex",
3546 "Run tests",
3547 ])
3548 .unwrap();
3549 assert!(matches!(
3550 &cli.command,
3551 Commands::Schedule {
3552 action: ScheduleAction::Install { provider, .. }
3553 } if provider == "codex"
3554 ));
3555 }
3556
3557 #[tokio::test]
3558 async fn test_execute_schedule_via_execute() {
3559 let node = start_test_server().await;
3561 let cli = Cli {
3562 node,
3563 token: None,
3564 command: Commands::Schedule {
3565 action: ScheduleAction::List,
3566 },
3567 };
3568 let result = execute(&cli).await;
3569 assert!(result.is_ok() || result.is_err());
3571 }
3572
3573 #[test]
3574 fn test_schedule_action_debug() {
3575 let action = ScheduleAction::List;
3576 assert_eq!(format!("{action:?}"), "List");
3577 }
3578
3579 #[test]
3582 fn test_cli_parse_culture() {
3583 let cli = Cli::try_parse_from(["pulpo", "culture"]).unwrap();
3584 assert!(matches!(cli.command, Commands::Culture { .. }));
3585 }
3586
3587 #[test]
3588 fn test_cli_parse_culture_alias() {
3589 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3590 assert!(matches!(cli.command, Commands::Culture { .. }));
3591 }
3592
3593 #[test]
3594 fn test_cli_parse_culture_with_filters() {
3595 let cli = Cli::try_parse_from([
3596 "pulpo",
3597 "culture",
3598 "--kind",
3599 "failure",
3600 "--repo",
3601 "/tmp/repo",
3602 "--ink",
3603 "coder",
3604 "--limit",
3605 "5",
3606 ])
3607 .unwrap();
3608 match &cli.command {
3609 Commands::Culture {
3610 kind,
3611 repo,
3612 ink,
3613 limit,
3614 ..
3615 } => {
3616 assert_eq!(kind.as_deref(), Some("failure"));
3617 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3618 assert_eq!(ink.as_deref(), Some("coder"));
3619 assert_eq!(*limit, 5);
3620 }
3621 _ => panic!("expected Culture command"),
3622 }
3623 }
3624
3625 #[test]
3626 fn test_cli_parse_culture_context() {
3627 let cli =
3628 Cli::try_parse_from(["pulpo", "culture", "--context", "--repo", "/tmp/repo"]).unwrap();
3629 match &cli.command {
3630 Commands::Culture { context, repo, .. } => {
3631 assert!(*context);
3632 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3633 }
3634 _ => panic!("expected Culture command"),
3635 }
3636 }
3637
3638 #[test]
3639 fn test_format_culture_empty() {
3640 assert_eq!(format_culture(&[]), "No culture found.");
3641 }
3642
3643 #[test]
3644 fn test_format_culture_items() {
3645 use chrono::Utc;
3646 use pulpo_common::culture::{Culture, CultureKind};
3647 use uuid::Uuid;
3648
3649 let items = vec![
3650 Culture {
3651 id: Uuid::new_v4(),
3652 session_id: Uuid::new_v4(),
3653 kind: CultureKind::Summary,
3654 scope_repo: Some("/tmp/repo".into()),
3655 scope_ink: Some("coder".into()),
3656 title: "Fixed the auth bug".into(),
3657 body: "Details".into(),
3658 tags: vec!["claude".into(), "completed".into()],
3659 relevance: 0.7,
3660 created_at: Utc::now(),
3661 last_referenced_at: None,
3662 reference_count: 0,
3663 },
3664 Culture {
3665 id: Uuid::new_v4(),
3666 session_id: Uuid::new_v4(),
3667 kind: CultureKind::Failure,
3668 scope_repo: None,
3669 scope_ink: None,
3670 title: "OOM crash during build".into(),
3671 body: "Details".into(),
3672 tags: vec!["failure".into()],
3673 relevance: 0.9,
3674 created_at: Utc::now(),
3675 last_referenced_at: None,
3676 reference_count: 0,
3677 },
3678 ];
3679
3680 let output = format_culture(&items);
3681 assert!(output.contains("KIND"));
3682 assert!(output.contains("TITLE"));
3683 assert!(output.contains("summary"));
3684 assert!(output.contains("failure"));
3685 assert!(output.contains("Fixed the auth bug"));
3686 assert!(output.contains("repo"));
3687 assert!(output.contains("0.70"));
3688 }
3689
3690 #[test]
3691 fn test_format_culture_long_title_truncated() {
3692 use chrono::Utc;
3693 use pulpo_common::culture::{Culture, CultureKind};
3694 use uuid::Uuid;
3695
3696 let items = vec![Culture {
3697 id: Uuid::new_v4(),
3698 session_id: Uuid::new_v4(),
3699 kind: CultureKind::Summary,
3700 scope_repo: Some("/repo".into()),
3701 scope_ink: None,
3702 title: "A very long title that exceeds the maximum display width for culture items in the CLI".into(),
3703 body: "Body".into(),
3704 tags: vec![],
3705 relevance: 0.5,
3706 created_at: Utc::now(),
3707 last_referenced_at: None,
3708 reference_count: 0,
3709 }];
3710
3711 let output = format_culture(&items);
3712 assert!(output.contains('…'));
3713 }
3714
3715 #[test]
3716 fn test_cli_parse_culture_get() {
3717 let cli = Cli::try_parse_from(["pulpo", "culture", "--get", "abc-123"]).unwrap();
3718 match &cli.command {
3719 Commands::Culture { get, .. } => {
3720 assert_eq!(get.as_deref(), Some("abc-123"));
3721 }
3722 _ => panic!("expected Culture command"),
3723 }
3724 }
3725
3726 #[test]
3727 fn test_cli_parse_culture_delete() {
3728 let cli = Cli::try_parse_from(["pulpo", "culture", "--delete", "abc-123"]).unwrap();
3729 match &cli.command {
3730 Commands::Culture { delete, .. } => {
3731 assert_eq!(delete.as_deref(), Some("abc-123"));
3732 }
3733 _ => panic!("expected Culture command"),
3734 }
3735 }
3736
3737 #[test]
3738 fn test_cli_parse_culture_push() {
3739 let cli = Cli::try_parse_from(["pulpo", "culture", "--push"]).unwrap();
3740 match &cli.command {
3741 Commands::Culture { push, .. } => {
3742 assert!(*push);
3743 }
3744 _ => panic!("expected Culture command"),
3745 }
3746 }
3747
3748 #[test]
3749 fn test_format_culture_no_repo() {
3750 use chrono::Utc;
3751 use pulpo_common::culture::{Culture, CultureKind};
3752 use uuid::Uuid;
3753
3754 let items = vec![Culture {
3755 id: Uuid::new_v4(),
3756 session_id: Uuid::new_v4(),
3757 kind: CultureKind::Summary,
3758 scope_repo: None,
3759 scope_ink: None,
3760 title: "Global finding".into(),
3761 body: "Body".into(),
3762 tags: vec![],
3763 relevance: 0.5,
3764 created_at: Utc::now(),
3765 last_referenced_at: None,
3766 reference_count: 0,
3767 }];
3768
3769 let output = format_culture(&items);
3770 assert!(output.contains('-')); }
3772}