1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, CultureDeleteResponse, CultureItemResponse,
5 CulturePushResponse, CultureResponse, InterventionEventResponse, PeersResponse,
6 ProvidersResponse,
7};
8#[cfg(test)]
9use pulpo_common::api::{ProviderCapabilitiesResponse, ProviderInfoResponse};
10use pulpo_common::culture::Culture;
11use pulpo_common::session::Session;
12
13#[derive(Parser, Debug)]
14#[command(
15 name = "pulpo",
16 about = "Manage agent sessions across your machines",
17 version
18)]
19pub struct Cli {
20 #[arg(long, default_value = "localhost:7433")]
22 pub node: String,
23
24 #[arg(long)]
26 pub token: Option<String>,
27
28 #[command(subcommand)]
29 pub command: Commands,
30}
31
32#[derive(Subcommand, Debug)]
33#[allow(clippy::large_enum_variant)]
34pub enum Commands {
35 #[command(visible_alias = "a")]
37 Attach {
38 name: String,
40 },
41
42 #[command(visible_alias = "i")]
44 Input {
45 name: String,
47 text: Option<String>,
49 },
50
51 #[command(visible_alias = "s")]
53 Spawn {
54 name: String,
56
57 #[arg(long)]
59 workdir: Option<String>,
60
61 #[arg(long)]
63 provider: Option<String>,
64
65 #[arg(long)]
67 auto: bool,
68
69 #[arg(long)]
71 unrestricted: bool,
72
73 #[arg(long)]
75 model: Option<String>,
76
77 #[arg(long)]
79 system_prompt: Option<String>,
80
81 #[arg(long, value_delimiter = ',')]
83 allowed_tools: Option<Vec<String>>,
84
85 #[arg(long)]
87 ink: Option<String>,
88
89 #[arg(long)]
91 max_turns: Option<u32>,
92
93 #[arg(long)]
95 max_budget: Option<f64>,
96
97 #[arg(long)]
99 output_format: Option<String>,
100
101 #[arg(long)]
103 worktree: bool,
104
105 #[arg(long)]
107 conversation_id: Option<String>,
108
109 #[arg(short, long)]
111 detach: bool,
112
113 prompt: Vec<String>,
115 },
116
117 #[command(visible_alias = "ls")]
119 List,
120
121 #[command(visible_alias = "l")]
123 Logs {
124 name: String,
126
127 #[arg(long, default_value = "100")]
129 lines: usize,
130
131 #[arg(short, long)]
133 follow: bool,
134 },
135
136 #[command(visible_alias = "k")]
138 Kill {
139 name: String,
141 },
142
143 #[command(visible_alias = "rm")]
145 Delete {
146 name: String,
148 },
149
150 #[command(visible_alias = "r")]
152 Resume {
153 name: String,
155 },
156
157 #[command(visible_alias = "n")]
159 Nodes,
160
161 #[command(visible_alias = "iv")]
163 Interventions {
164 name: String,
166 },
167
168 #[command(visible_alias = "p")]
170 Providers,
171
172 Ui,
174
175 #[command(visible_alias = "kn")]
177 Culture {
178 #[arg(long)]
180 session: Option<String>,
181
182 #[arg(long)]
184 kind: Option<String>,
185
186 #[arg(long)]
188 repo: Option<String>,
189
190 #[arg(long)]
192 ink: Option<String>,
193
194 #[arg(long, default_value = "20")]
196 limit: usize,
197
198 #[arg(long)]
200 context: bool,
201
202 #[arg(long)]
204 get: Option<String>,
205
206 #[arg(long)]
208 delete: Option<String>,
209
210 #[arg(long)]
212 push: bool,
213 },
214
215 #[command(visible_alias = "sched")]
217 Schedule {
218 #[command(subcommand)]
219 action: ScheduleAction,
220 },
221}
222
223#[derive(Subcommand, Debug)]
224pub enum ScheduleAction {
225 Install {
227 name: String,
229 cron: String,
231 #[arg(long)]
233 workdir: String,
234 #[arg(long, default_value = "claude")]
236 provider: String,
237 prompt: Vec<String>,
239 },
240 #[command(alias = "ls")]
242 List,
243 #[command(alias = "rm")]
245 Remove {
246 name: String,
248 },
249 Pause {
251 name: String,
253 },
254 Resume {
256 name: String,
258 },
259}
260
261pub fn base_url(node: &str) -> String {
263 format!("http://{node}")
264}
265
266#[derive(serde::Deserialize)]
268struct OutputResponse {
269 output: String,
270}
271
272fn format_sessions(sessions: &[Session]) -> String {
274 if sessions.is_empty() {
275 return "No sessions.".into();
276 }
277 let mut lines = vec![format!(
278 "{:<20} {:<12} {:<10} {:<14} {}",
279 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
280 )];
281 for s in sessions {
282 let prompt_display = if s.prompt.len() > 40 {
283 format!("{}...", &s.prompt[..37])
284 } else {
285 s.prompt.clone()
286 };
287 let status_display = s.status.to_string();
288 lines.push(format!(
289 "{:<20} {:<12} {:<10} {:<14} {}",
290 s.name, status_display, s.provider, s.mode, prompt_display
291 ));
292 }
293 lines.join("\n")
294}
295
296fn format_nodes(resp: &PeersResponse) -> String {
298 let mut lines = vec![format!(
299 "{:<20} {:<25} {:<10} {}",
300 "NAME", "ADDRESS", "STATUS", "SESSIONS"
301 )];
302 lines.push(format!(
303 "{:<20} {:<25} {:<10} {}",
304 resp.local.name, "(local)", "online", "-"
305 ));
306 for p in &resp.peers {
307 let sessions = p
308 .session_count
309 .map_or_else(|| "-".into(), |c| c.to_string());
310 lines.push(format!(
311 "{:<20} {:<25} {:<10} {}",
312 p.name, p.address, p.status, sessions
313 ));
314 }
315 lines.join("\n")
316}
317
318fn format_interventions(events: &[InterventionEventResponse]) -> String {
320 if events.is_empty() {
321 return "No intervention events.".into();
322 }
323 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
324 for e in events {
325 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
326 }
327 lines.join("\n")
328}
329
330fn format_culture(items: &[Culture]) -> String {
332 if items.is_empty() {
333 return "No culture found.".into();
334 }
335 let mut lines = vec![format!(
336 "{:<10} {:<40} {:<10} {:<6} {}",
337 "KIND", "TITLE", "REPO", "REL", "TAGS"
338 )];
339 for k in items {
340 let title = if k.title.len() > 38 {
341 format!("{}…", &k.title[..37])
342 } else {
343 k.title.clone()
344 };
345 let repo = k
346 .scope_repo
347 .as_deref()
348 .and_then(|r| r.rsplit('/').next())
349 .unwrap_or("-");
350 let tags = k.tags.join(",");
351 lines.push(format!(
352 "{:<10} {:<40} {:<10} {:<6.2} {}",
353 k.kind, title, repo, k.relevance, tags
354 ));
355 }
356 lines.join("\n")
357}
358
359fn format_providers(resp: &ProvidersResponse) -> String {
361 let mut lines = vec![format!(
362 "{:<12} {:<10} {:<20} {}",
363 "PROVIDER", "AVAILABLE", "BINARY", "CAPABILITIES"
364 )];
365 for p in &resp.providers {
366 let avail = if p.available { "yes" } else { "no" };
367 let mut caps = Vec::new();
368 let c = &p.capabilities;
369 if c.model {
370 caps.push("model");
371 }
372 if c.system_prompt {
373 caps.push("system-prompt");
374 }
375 if c.allowed_tools {
376 caps.push("allowed-tools");
377 }
378 if c.max_turns {
379 caps.push("max-turns");
380 }
381 if c.max_budget_usd {
382 caps.push("max-budget");
383 }
384 if c.output_format {
385 caps.push("output-format");
386 }
387 if c.worktree {
388 caps.push("worktree");
389 }
390 if c.unrestricted {
391 caps.push("unrestricted");
392 }
393 if c.resume {
394 caps.push("resume");
395 }
396 let caps_str = if caps.is_empty() {
397 "-".to_owned()
398 } else {
399 caps.join(", ")
400 };
401 lines.push(format!(
402 "{:<12} {:<10} {:<20} {}",
403 p.provider, avail, p.binary, caps_str
404 ));
405 }
406 lines.join("\n")
407}
408
409#[cfg_attr(coverage, allow(dead_code))]
411fn build_open_command(url: &str) -> std::process::Command {
412 #[cfg(target_os = "macos")]
413 {
414 let mut cmd = std::process::Command::new("open");
415 cmd.arg(url);
416 cmd
417 }
418 #[cfg(target_os = "linux")]
419 {
420 let mut cmd = std::process::Command::new("xdg-open");
421 cmd.arg(url);
422 cmd
423 }
424 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
425 {
426 let mut cmd = std::process::Command::new("xdg-open");
428 cmd.arg(url);
429 cmd
430 }
431}
432
433#[cfg(not(coverage))]
435fn open_browser(url: &str) -> Result<()> {
436 build_open_command(url).status()?;
437 Ok(())
438}
439
440#[cfg(coverage)]
442fn open_browser(_url: &str) -> Result<()> {
443 Ok(())
444}
445
446#[cfg_attr(coverage, allow(dead_code))]
449fn build_attach_command(backend_session_id: &str) -> std::process::Command {
450 let mut cmd = std::process::Command::new("tmux");
451 cmd.args(["attach-session", "-t", backend_session_id]);
452 cmd
453}
454
455#[cfg(not(any(test, coverage)))]
457fn attach_session(backend_session_id: &str) -> Result<()> {
458 let status = build_attach_command(backend_session_id).status()?;
459 if !status.success() {
460 anyhow::bail!("attach failed with {status}");
461 }
462 Ok(())
463}
464
465#[cfg(any(test, coverage))]
467#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
468fn attach_session(_backend_session_id: &str) -> Result<()> {
469 Ok(())
470}
471
472fn api_error(text: &str) -> anyhow::Error {
474 serde_json::from_str::<serde_json::Value>(text)
475 .ok()
476 .and_then(|v| v["error"].as_str().map(String::from))
477 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
478}
479
480async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
482 if resp.status().is_success() {
483 Ok(resp.text().await?)
484 } else {
485 let text = resp.text().await?;
486 Err(api_error(&text))
487 }
488}
489
490fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
492 if err.is_connect() {
493 anyhow::anyhow!(
494 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
495 )
496 } else {
497 anyhow::anyhow!("Network error connecting to {node}: {err}")
498 }
499}
500
501fn is_localhost(node: &str) -> bool {
503 let host = node.split(':').next().unwrap_or(node);
504 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
505}
506
507async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
509 let resp = client
510 .get(format!("{base}/api/v1/auth/token"))
511 .send()
512 .await
513 .ok()?;
514 let body: AuthTokenResponse = resp.json().await.ok()?;
515 if body.token.is_empty() {
516 None
517 } else {
518 Some(body.token)
519 }
520}
521
522async fn resolve_token(
524 client: &reqwest::Client,
525 base: &str,
526 node: &str,
527 explicit: Option<&str>,
528) -> Option<String> {
529 if let Some(t) = explicit {
530 return Some(t.to_owned());
531 }
532 if is_localhost(node) {
533 return discover_token(client, base).await;
534 }
535 None
536}
537
538fn authed_get(
540 client: &reqwest::Client,
541 url: String,
542 token: Option<&str>,
543) -> reqwest::RequestBuilder {
544 let req = client.get(url);
545 if let Some(t) = token {
546 req.bearer_auth(t)
547 } else {
548 req
549 }
550}
551
552fn authed_post(
554 client: &reqwest::Client,
555 url: String,
556 token: Option<&str>,
557) -> reqwest::RequestBuilder {
558 let req = client.post(url);
559 if let Some(t) = token {
560 req.bearer_auth(t)
561 } else {
562 req
563 }
564}
565
566fn authed_delete(
568 client: &reqwest::Client,
569 url: String,
570 token: Option<&str>,
571) -> reqwest::RequestBuilder {
572 let req = client.delete(url);
573 if let Some(t) = token {
574 req.bearer_auth(t)
575 } else {
576 req
577 }
578}
579
580async fn fetch_output(
582 client: &reqwest::Client,
583 base: &str,
584 name: &str,
585 lines: usize,
586 token: Option<&str>,
587) -> Result<String> {
588 let resp = authed_get(
589 client,
590 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
591 token,
592 )
593 .send()
594 .await?;
595 let text = ok_or_api_error(resp).await?;
596 let output: OutputResponse = serde_json::from_str(&text)?;
597 Ok(output.output)
598}
599
600async fn fetch_session_status(
602 client: &reqwest::Client,
603 base: &str,
604 name: &str,
605 token: Option<&str>,
606) -> Result<String> {
607 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
608 .send()
609 .await?;
610 let text = ok_or_api_error(resp).await?;
611 let session: Session = serde_json::from_str(&text)?;
612 Ok(session.status.to_string())
613}
614
615fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
622 if prev.is_empty() {
623 return new;
624 }
625
626 let prev_lines: Vec<&str> = prev.lines().collect();
627 let new_lines: Vec<&str> = new.lines().collect();
628
629 if new_lines.is_empty() {
630 return "";
631 }
632
633 let last_prev = prev_lines[prev_lines.len() - 1];
635
636 for i in (0..new_lines.len()).rev() {
638 if new_lines[i] == last_prev {
639 let overlap_len = prev_lines.len().min(i + 1);
641 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
642 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
643 if prev_tail == new_overlap {
644 if i + 1 < new_lines.len() {
645 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
647 return new.get(consumed.min(new.len())..).unwrap_or("");
648 }
649 return "";
650 }
651 }
652 }
653
654 new
656}
657
658async fn follow_logs(
660 client: &reqwest::Client,
661 base: &str,
662 name: &str,
663 lines: usize,
664 token: Option<&str>,
665 writer: &mut (dyn std::io::Write + Send),
666) -> Result<()> {
667 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
668 write!(writer, "{prev_output}")?;
669
670 loop {
671 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
672
673 let status = fetch_session_status(client, base, name, token).await?;
675 let is_terminal = status == "finished" || status == "killed" || status == "lost";
676
677 let new_output = fetch_output(client, base, name, lines, token).await?;
679
680 let diff = diff_output(&prev_output, &new_output);
681 if !diff.is_empty() {
682 write!(writer, "{diff}")?;
683 }
684 prev_output = new_output;
685
686 if is_terminal {
687 break;
688 }
689 }
690 Ok(())
691}
692
693#[cfg_attr(coverage, allow(dead_code))]
696const CRONTAB_TAG: &str = "#pulpo:";
697
698#[cfg(not(coverage))]
700fn read_crontab() -> Result<String> {
701 let output = std::process::Command::new("crontab").arg("-l").output()?;
702 if output.status.success() {
703 Ok(String::from_utf8_lossy(&output.stdout).to_string())
704 } else {
705 Ok(String::new())
706 }
707}
708
709#[cfg(not(coverage))]
711fn write_crontab(content: &str) -> Result<()> {
712 use std::io::Write;
713 let mut child = std::process::Command::new("crontab")
714 .arg("-")
715 .stdin(std::process::Stdio::piped())
716 .spawn()?;
717 child
718 .stdin
719 .as_mut()
720 .unwrap()
721 .write_all(content.as_bytes())?;
722 let status = child.wait()?;
723 if !status.success() {
724 anyhow::bail!("crontab write failed");
725 }
726 Ok(())
727}
728
729#[cfg_attr(coverage, allow(dead_code))]
731fn build_crontab_line(
732 name: &str,
733 cron: &str,
734 workdir: &str,
735 provider: &str,
736 prompt: &str,
737 node: &str,
738) -> String {
739 format!(
740 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
741 )
742}
743
744#[cfg_attr(coverage, allow(dead_code))]
746fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
747 let tag = format!("{CRONTAB_TAG}{name}");
748 if crontab.contains(&tag) {
749 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
750 }
751 let mut result = crontab.to_owned();
752 result.push_str(line);
753 Ok(result)
754}
755
756#[cfg_attr(coverage, allow(dead_code))]
758fn crontab_list(crontab: &str) -> String {
759 let entries: Vec<&str> = crontab
760 .lines()
761 .filter(|l| l.contains(CRONTAB_TAG))
762 .collect();
763 if entries.is_empty() {
764 return "No pulpo schedules.".into();
765 }
766 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
767 for entry in entries {
768 let paused = entry.starts_with('#');
769 let raw = entry.trim_start_matches('#').trim();
770 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
771 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
772 let cron_expr = if parts.len() >= 5 {
773 parts[..5].join(" ")
774 } else {
775 "?".into()
776 };
777 lines.push(format!(
778 "{:<20} {:<15} {}",
779 name,
780 cron_expr,
781 if paused { "yes" } else { "no" }
782 ));
783 }
784 lines.join("\n")
785}
786
787#[cfg_attr(coverage, allow(dead_code))]
789fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
790 use std::fmt::Write;
791 let tag = format!("{CRONTAB_TAG}{name}");
792 let filtered =
793 crontab
794 .lines()
795 .filter(|l| !l.contains(&tag))
796 .fold(String::new(), |mut acc, l| {
797 writeln!(acc, "{l}").unwrap();
798 acc
799 });
800 if filtered.len() == crontab.len() {
801 anyhow::bail!("schedule \"{name}\" not found");
802 }
803 Ok(filtered)
804}
805
806#[cfg_attr(coverage, allow(dead_code))]
808fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
809 use std::fmt::Write;
810 let tag = format!("{CRONTAB_TAG}{name}");
811 let mut found = false;
812 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
813 if l.contains(&tag) && !l.starts_with('#') {
814 found = true;
815 writeln!(acc, "#{l}").unwrap();
816 } else {
817 writeln!(acc, "{l}").unwrap();
818 }
819 acc
820 });
821 if !found {
822 anyhow::bail!("schedule \"{name}\" not found or already paused");
823 }
824 Ok(updated)
825}
826
827#[cfg_attr(coverage, allow(dead_code))]
829fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
830 use std::fmt::Write;
831 let tag = format!("{CRONTAB_TAG}{name}");
832 let mut found = false;
833 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
834 if l.contains(&tag) && l.starts_with('#') {
835 found = true;
836 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
837 } else {
838 writeln!(acc, "{l}").unwrap();
839 }
840 acc
841 });
842 if !found {
843 anyhow::bail!("schedule \"{name}\" not found or not paused");
844 }
845 Ok(updated)
846}
847
848#[cfg(not(coverage))]
850fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
851 match action {
852 ScheduleAction::Install {
853 name,
854 cron,
855 workdir,
856 provider,
857 prompt,
858 } => {
859 let crontab = read_crontab()?;
860 let joined_prompt = prompt.join(" ");
861 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
862 let updated = crontab_install(&crontab, name, &line)?;
863 write_crontab(&updated)?;
864 Ok(format!("Installed schedule \"{name}\""))
865 }
866 ScheduleAction::List => {
867 let crontab = read_crontab()?;
868 Ok(crontab_list(&crontab))
869 }
870 ScheduleAction::Remove { name } => {
871 let crontab = read_crontab()?;
872 let updated = crontab_remove(&crontab, name)?;
873 write_crontab(&updated)?;
874 Ok(format!("Removed schedule \"{name}\""))
875 }
876 ScheduleAction::Pause { name } => {
877 let crontab = read_crontab()?;
878 let updated = crontab_pause(&crontab, name)?;
879 write_crontab(&updated)?;
880 Ok(format!("Paused schedule \"{name}\""))
881 }
882 ScheduleAction::Resume { name } => {
883 let crontab = read_crontab()?;
884 let updated = crontab_resume(&crontab, name)?;
885 write_crontab(&updated)?;
886 Ok(format!("Resumed schedule \"{name}\""))
887 }
888 }
889}
890
891#[cfg(coverage)]
893fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
894 Ok(String::new())
895}
896
897#[allow(clippy::too_many_lines)]
899pub async fn execute(cli: &Cli) -> Result<String> {
900 let url = base_url(&cli.node);
901 let client = reqwest::Client::new();
902 let node = &cli.node;
903 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
904
905 match &cli.command {
906 Commands::Attach { name } => {
907 let resp = authed_get(
909 &client,
910 format!("{url}/api/v1/sessions/{name}"),
911 token.as_deref(),
912 )
913 .send()
914 .await
915 .map_err(|e| friendly_error(&e, node))?;
916 let text = ok_or_api_error(resp).await?;
917 let session: Session = serde_json::from_str(&text)?;
918 match session.status.to_string().as_str() {
919 "lost" => {
920 anyhow::bail!(
921 "Session \"{name}\" is lost (agent process died). Resume it first:\n pulpo resume {name}"
922 );
923 }
924 "finished" | "killed" => {
925 anyhow::bail!(
926 "Session \"{name}\" is {} — cannot attach to a finished session.",
927 session.status
928 );
929 }
930 _ => {}
931 }
932 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
933 attach_session(&backend_id)?;
934 Ok(format!("Detached from session {name}."))
935 }
936 Commands::Input { name, text } => {
937 let input_text = text.as_deref().unwrap_or("\n");
938 let body = serde_json::json!({ "text": input_text });
939 let resp = authed_post(
940 &client,
941 format!("{url}/api/v1/sessions/{name}/input"),
942 token.as_deref(),
943 )
944 .json(&body)
945 .send()
946 .await
947 .map_err(|e| friendly_error(&e, node))?;
948 ok_or_api_error(resp).await?;
949 Ok(format!("Sent input to session {name}."))
950 }
951 Commands::List => {
952 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
953 .send()
954 .await
955 .map_err(|e| friendly_error(&e, node))?;
956 let text = ok_or_api_error(resp).await?;
957 let sessions: Vec<Session> = serde_json::from_str(&text)?;
958 Ok(format_sessions(&sessions))
959 }
960 Commands::Providers => {
961 let resp = authed_get(&client, format!("{url}/api/v1/providers"), token.as_deref())
962 .send()
963 .await
964 .map_err(|e| friendly_error(&e, node))?;
965 let text = ok_or_api_error(resp).await?;
966 let resp: ProvidersResponse = serde_json::from_str(&text)?;
967 Ok(format_providers(&resp))
968 }
969 Commands::Nodes => {
970 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
971 .send()
972 .await
973 .map_err(|e| friendly_error(&e, node))?;
974 let text = ok_or_api_error(resp).await?;
975 let resp: PeersResponse = serde_json::from_str(&text)?;
976 Ok(format_nodes(&resp))
977 }
978 Commands::Spawn {
979 workdir,
980 name,
981 provider,
982 auto,
983 unrestricted,
984 model,
985 system_prompt,
986 allowed_tools,
987 ink,
988 max_turns,
989 max_budget,
990 output_format,
991 worktree,
992 conversation_id,
993 detach,
994 prompt,
995 } => {
996 let prompt_text = prompt.join(" ");
997 let mode = if *auto { "autonomous" } else { "interactive" };
998 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
1000 std::env::current_dir()
1001 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
1002 });
1003 let mut body = serde_json::json!({
1004 "name": name,
1005 "workdir": resolved_workdir,
1006 "mode": mode,
1007 });
1008 if !prompt_text.is_empty() {
1010 body["prompt"] = serde_json::json!(prompt_text);
1011 }
1012 if let Some(p) = provider {
1014 body["provider"] = serde_json::json!(p);
1015 }
1016 if *unrestricted {
1017 body["unrestricted"] = serde_json::json!(true);
1018 }
1019 if let Some(m) = model {
1020 body["model"] = serde_json::json!(m);
1021 }
1022 if let Some(sp) = system_prompt {
1023 body["system_prompt"] = serde_json::json!(sp);
1024 }
1025 if let Some(tools) = allowed_tools {
1026 body["allowed_tools"] = serde_json::json!(tools);
1027 }
1028 if let Some(p) = ink {
1029 body["ink"] = serde_json::json!(p);
1030 }
1031 if let Some(mt) = max_turns {
1032 body["max_turns"] = serde_json::json!(mt);
1033 }
1034 if let Some(mb) = max_budget {
1035 body["max_budget_usd"] = serde_json::json!(mb);
1036 }
1037 if let Some(of) = output_format {
1038 body["output_format"] = serde_json::json!(of);
1039 }
1040 if *worktree {
1041 body["worktree"] = serde_json::json!(true);
1042 }
1043 if let Some(cid) = conversation_id {
1044 body["conversation_id"] = serde_json::json!(cid);
1045 }
1046 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
1047 .json(&body)
1048 .send()
1049 .await
1050 .map_err(|e| friendly_error(&e, node))?;
1051 let text = ok_or_api_error(resp).await?;
1052 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
1053 let mut msg = format!(
1054 "Created session \"{}\" ({})",
1055 resp.session.name, resp.session.id
1056 );
1057 for w in &resp.warnings {
1058 use std::fmt::Write;
1059 let _ = write!(msg, "\n Warning: {w}");
1060 }
1061 if !detach {
1062 let backend_id = resp
1063 .session
1064 .backend_session_id
1065 .as_deref()
1066 .unwrap_or(&resp.session.name);
1067 eprintln!("{msg}");
1068 attach_session(backend_id)?;
1069 return Ok(format!("Detached from session \"{}\".", resp.session.name));
1070 }
1071 Ok(msg)
1072 }
1073 Commands::Kill { name } => {
1074 let resp = authed_post(
1075 &client,
1076 format!("{url}/api/v1/sessions/{name}/kill"),
1077 token.as_deref(),
1078 )
1079 .send()
1080 .await
1081 .map_err(|e| friendly_error(&e, node))?;
1082 ok_or_api_error(resp).await?;
1083 Ok(format!("Session {name} killed."))
1084 }
1085 Commands::Delete { name } => {
1086 let resp = authed_delete(
1087 &client,
1088 format!("{url}/api/v1/sessions/{name}"),
1089 token.as_deref(),
1090 )
1091 .send()
1092 .await
1093 .map_err(|e| friendly_error(&e, node))?;
1094 ok_or_api_error(resp).await?;
1095 Ok(format!("Session {name} deleted."))
1096 }
1097 Commands::Logs {
1098 name,
1099 lines,
1100 follow,
1101 } => {
1102 if *follow {
1103 let mut stdout = std::io::stdout();
1104 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1105 .await
1106 .map_err(|e| {
1107 match e.downcast::<reqwest::Error>() {
1109 Ok(re) => friendly_error(&re, node),
1110 Err(other) => other,
1111 }
1112 })?;
1113 Ok(String::new())
1114 } else {
1115 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1116 .await
1117 .map_err(|e| match e.downcast::<reqwest::Error>() {
1118 Ok(re) => friendly_error(&re, node),
1119 Err(other) => other,
1120 })?;
1121 Ok(output)
1122 }
1123 }
1124 Commands::Interventions { name } => {
1125 let resp = authed_get(
1126 &client,
1127 format!("{url}/api/v1/sessions/{name}/interventions"),
1128 token.as_deref(),
1129 )
1130 .send()
1131 .await
1132 .map_err(|e| friendly_error(&e, node))?;
1133 let text = ok_or_api_error(resp).await?;
1134 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1135 Ok(format_interventions(&events))
1136 }
1137 Commands::Ui => {
1138 let dashboard = base_url(&cli.node);
1139 open_browser(&dashboard)?;
1140 Ok(format!("Opening {dashboard}"))
1141 }
1142 Commands::Resume { name } => {
1143 let resp = authed_post(
1144 &client,
1145 format!("{url}/api/v1/sessions/{name}/resume"),
1146 token.as_deref(),
1147 )
1148 .send()
1149 .await
1150 .map_err(|e| friendly_error(&e, node))?;
1151 let text = ok_or_api_error(resp).await?;
1152 let session: Session = serde_json::from_str(&text)?;
1153 let backend_id = session
1154 .backend_session_id
1155 .as_deref()
1156 .unwrap_or(&session.name);
1157 eprintln!("Resumed session \"{}\"", session.name);
1158 attach_session(backend_id)?;
1159 Ok(format!("Detached from session \"{}\".", session.name))
1160 }
1161 Commands::Culture {
1162 session,
1163 kind,
1164 repo,
1165 ink,
1166 limit,
1167 context,
1168 get,
1169 delete,
1170 push,
1171 } => {
1172 if let Some(id) = get {
1174 let endpoint = format!("{url}/api/v1/culture/{id}");
1175 let resp = authed_get(&client, endpoint, token.as_deref())
1176 .send()
1177 .await
1178 .map_err(|e| friendly_error(&e, node))?;
1179 let text = ok_or_api_error(resp).await?;
1180 let resp: CultureItemResponse = serde_json::from_str(&text)?;
1181 return Ok(format_culture(&[resp.culture]));
1182 }
1183
1184 if let Some(id) = delete {
1186 let endpoint = format!("{url}/api/v1/culture/{id}");
1187 let resp = authed_delete(&client, endpoint, token.as_deref())
1188 .send()
1189 .await
1190 .map_err(|e| friendly_error(&e, node))?;
1191 let text = ok_or_api_error(resp).await?;
1192 let resp: CultureDeleteResponse = serde_json::from_str(&text)?;
1193 return Ok(if resp.deleted {
1194 format!("Deleted culture item {id}")
1195 } else {
1196 format!("Culture item {id} not found")
1197 });
1198 }
1199
1200 if *push {
1202 let endpoint = format!("{url}/api/v1/culture/push");
1203 let resp = authed_post(&client, endpoint, token.as_deref())
1204 .send()
1205 .await
1206 .map_err(|e| friendly_error(&e, node))?;
1207 let text = ok_or_api_error(resp).await?;
1208 let resp: CulturePushResponse = serde_json::from_str(&text)?;
1209 return Ok(resp.message);
1210 }
1211
1212 let mut params = vec![format!("limit={limit}")];
1214 let endpoint = if *context {
1215 if let Some(r) = repo {
1216 params.push(format!("workdir={r}"));
1217 }
1218 if let Some(i) = ink {
1219 params.push(format!("ink={i}"));
1220 }
1221 format!("{url}/api/v1/culture/context?{}", params.join("&"))
1222 } else {
1223 if let Some(s) = session {
1224 params.push(format!("session_id={s}"));
1225 }
1226 if let Some(k) = kind {
1227 params.push(format!("kind={k}"));
1228 }
1229 if let Some(r) = repo {
1230 params.push(format!("repo={r}"));
1231 }
1232 if let Some(i) = ink {
1233 params.push(format!("ink={i}"));
1234 }
1235 format!("{url}/api/v1/culture?{}", params.join("&"))
1236 };
1237 let resp = authed_get(&client, endpoint, token.as_deref())
1238 .send()
1239 .await
1240 .map_err(|e| friendly_error(&e, node))?;
1241 let text = ok_or_api_error(resp).await?;
1242 let resp: CultureResponse = serde_json::from_str(&text)?;
1243 Ok(format_culture(&resp.culture))
1244 }
1245 Commands::Schedule { action } => execute_schedule(action, node),
1246 }
1247}
1248
1249#[cfg(test)]
1250mod tests {
1251 use super::*;
1252 use pulpo_common::session::Provider;
1253
1254 #[test]
1255 fn test_base_url() {
1256 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1257 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1258 }
1259
1260 #[test]
1261 fn test_cli_parse_list() {
1262 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1263 assert_eq!(cli.node, "localhost:7433");
1264 assert!(matches!(cli.command, Commands::List));
1265 }
1266
1267 #[test]
1268 fn test_cli_parse_nodes() {
1269 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1270 assert!(matches!(cli.command, Commands::Nodes));
1271 }
1272
1273 #[test]
1274 fn test_cli_parse_providers() {
1275 let cli = Cli::try_parse_from(["pulpo", "providers"]).unwrap();
1276 assert!(matches!(cli.command, Commands::Providers));
1277 }
1278
1279 #[test]
1280 fn test_cli_parse_providers_alias() {
1281 let cli = Cli::try_parse_from(["pulpo", "p"]).unwrap();
1282 assert!(matches!(cli.command, Commands::Providers));
1283 }
1284
1285 #[test]
1286 fn test_format_providers_all() {
1287 let resp = ProvidersResponse {
1288 providers: vec![
1289 ProviderInfoResponse {
1290 provider: Provider::Claude,
1291 binary: "claude".into(),
1292 available: true,
1293 capabilities: ProviderCapabilitiesResponse {
1294 model: true,
1295 system_prompt: true,
1296 allowed_tools: true,
1297 max_turns: true,
1298 max_budget_usd: true,
1299 output_format: true,
1300 worktree: true,
1301 unrestricted: true,
1302 resume: true,
1303 },
1304 },
1305 ProviderInfoResponse {
1306 provider: Provider::Shell,
1307 binary: "bash".into(),
1308 available: true,
1309 capabilities: ProviderCapabilitiesResponse {
1310 model: false,
1311 system_prompt: false,
1312 allowed_tools: false,
1313 max_turns: false,
1314 max_budget_usd: false,
1315 output_format: false,
1316 worktree: false,
1317 unrestricted: false,
1318 resume: false,
1319 },
1320 },
1321 ],
1322 };
1323 let output = format_providers(&resp);
1324 assert!(output.contains("PROVIDER"));
1325 assert!(output.contains("claude"));
1326 assert!(output.contains("yes"));
1327 assert!(output.contains("shell"));
1328 assert!(output.contains("bash"));
1329 assert!(output.contains('-'));
1331 assert!(output.contains("model"));
1333 assert!(output.contains("system-prompt"));
1334 assert!(output.contains("worktree"));
1335 }
1336
1337 #[test]
1338 fn test_format_providers_unavailable() {
1339 let resp = ProvidersResponse {
1340 providers: vec![ProviderInfoResponse {
1341 provider: Provider::Codex,
1342 binary: "codex".into(),
1343 available: false,
1344 capabilities: ProviderCapabilitiesResponse {
1345 model: true,
1346 system_prompt: false,
1347 allowed_tools: false,
1348 max_turns: false,
1349 max_budget_usd: false,
1350 output_format: false,
1351 worktree: false,
1352 unrestricted: false,
1353 resume: true,
1354 },
1355 }],
1356 };
1357 let output = format_providers(&resp);
1358 assert!(output.contains("no"));
1359 assert!(output.contains("codex"));
1360 }
1361
1362 #[test]
1363 fn test_cli_parse_ui() {
1364 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1365 assert!(matches!(cli.command, Commands::Ui));
1366 }
1367
1368 #[test]
1369 fn test_cli_parse_ui_custom_node() {
1370 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1371 assert!(matches!(cli.command, Commands::Ui));
1372 assert_eq!(cli.node, "mac-mini:7433");
1373 }
1374
1375 #[test]
1376 fn test_build_open_command() {
1377 let cmd = build_open_command("http://localhost:7433");
1378 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1379 assert_eq!(args, vec!["http://localhost:7433"]);
1380 #[cfg(target_os = "macos")]
1381 assert_eq!(cmd.get_program(), "open");
1382 #[cfg(target_os = "linux")]
1383 assert_eq!(cmd.get_program(), "xdg-open");
1384 }
1385
1386 #[test]
1387 fn test_cli_parse_spawn() {
1388 let cli = Cli::try_parse_from([
1389 "pulpo",
1390 "spawn",
1391 "my-task",
1392 "--workdir",
1393 "/tmp/repo",
1394 "Fix",
1395 "the",
1396 "bug",
1397 ])
1398 .unwrap();
1399 assert!(matches!(
1400 &cli.command,
1401 Commands::Spawn { name, workdir, provider, auto, unrestricted, prompt, .. }
1402 if name == "my-task" && workdir.as_deref() == Some("/tmp/repo")
1403 && provider.is_none() && !auto && !unrestricted
1404 && prompt == &["Fix", "the", "bug"]
1405 ));
1406 }
1407
1408 #[test]
1409 fn test_cli_parse_spawn_with_provider() {
1410 let cli =
1411 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--provider", "codex", "Do it"])
1412 .unwrap();
1413 assert!(matches!(
1414 &cli.command,
1415 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1416 ));
1417 }
1418
1419 #[test]
1420 fn test_cli_parse_spawn_auto() {
1421 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--auto", "Do it"]).unwrap();
1422 assert!(matches!(
1423 &cli.command,
1424 Commands::Spawn { auto, .. } if *auto
1425 ));
1426 }
1427
1428 #[test]
1429 fn test_cli_parse_spawn_unrestricted() {
1430 let cli =
1431 Cli::try_parse_from(["pulpo", "spawn", "my-task", "--unrestricted", "Do it"]).unwrap();
1432 assert!(matches!(
1433 &cli.command,
1434 Commands::Spawn { unrestricted, .. } if *unrestricted
1435 ));
1436 }
1437
1438 #[test]
1439 fn test_cli_parse_spawn_unrestricted_default() {
1440 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "Do it"]).unwrap();
1441 assert!(matches!(
1442 &cli.command,
1443 Commands::Spawn { unrestricted, .. } if !unrestricted
1444 ));
1445 }
1446
1447 #[test]
1448 fn test_cli_parse_spawn_name_positional() {
1449 let cli = Cli::try_parse_from(["pulpo", "spawn", "portal", "Fix", "it"]).unwrap();
1450 assert!(matches!(
1451 &cli.command,
1452 Commands::Spawn { name, prompt, .. }
1453 if name == "portal" && prompt == &["Fix", "it"]
1454 ));
1455 }
1456
1457 #[test]
1458 fn test_cli_parse_spawn_with_conversation_id() {
1459 let cli = Cli::try_parse_from([
1460 "pulpo",
1461 "spawn",
1462 "my-task",
1463 "--conversation-id",
1464 "conv-abc-123",
1465 "Fix it",
1466 ])
1467 .unwrap();
1468 assert!(matches!(
1469 &cli.command,
1470 Commands::Spawn { conversation_id, .. }
1471 if conversation_id.as_deref() == Some("conv-abc-123")
1472 ));
1473 }
1474
1475 #[test]
1476 fn test_cli_parse_spawn_without_conversation_id() {
1477 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "Fix it"]).unwrap();
1478 assert!(matches!(
1479 &cli.command,
1480 Commands::Spawn { conversation_id, .. } if conversation_id.is_none()
1481 ));
1482 }
1483
1484 #[test]
1485 fn test_cli_parse_spawn_detach() {
1486 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "--detach", "Do it"]).unwrap();
1487 assert!(matches!(
1488 &cli.command,
1489 Commands::Spawn { detach, .. } if *detach
1490 ));
1491 }
1492
1493 #[test]
1494 fn test_cli_parse_spawn_detach_short() {
1495 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "-d", "Do it"]).unwrap();
1496 assert!(matches!(
1497 &cli.command,
1498 Commands::Spawn { detach, .. } if *detach
1499 ));
1500 }
1501
1502 #[test]
1503 fn test_cli_parse_spawn_detach_default() {
1504 let cli = Cli::try_parse_from(["pulpo", "spawn", "my-task", "Do it"]).unwrap();
1505 assert!(matches!(
1506 &cli.command,
1507 Commands::Spawn { detach, .. } if !detach
1508 ));
1509 }
1510
1511 #[test]
1512 fn test_cli_parse_logs() {
1513 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1514 assert!(matches!(
1515 &cli.command,
1516 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1517 ));
1518 }
1519
1520 #[test]
1521 fn test_cli_parse_logs_with_lines() {
1522 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1523 assert!(matches!(
1524 &cli.command,
1525 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1526 ));
1527 }
1528
1529 #[test]
1530 fn test_cli_parse_logs_follow() {
1531 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1532 assert!(matches!(
1533 &cli.command,
1534 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1535 ));
1536 }
1537
1538 #[test]
1539 fn test_cli_parse_logs_follow_short() {
1540 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1541 assert!(matches!(
1542 &cli.command,
1543 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1544 ));
1545 }
1546
1547 #[test]
1548 fn test_cli_parse_kill() {
1549 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1550 assert!(matches!(
1551 &cli.command,
1552 Commands::Kill { name } if name == "my-session"
1553 ));
1554 }
1555
1556 #[test]
1557 fn test_cli_parse_delete() {
1558 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1559 assert!(matches!(
1560 &cli.command,
1561 Commands::Delete { name } if name == "my-session"
1562 ));
1563 }
1564
1565 #[test]
1566 fn test_cli_parse_resume() {
1567 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1568 assert!(matches!(
1569 &cli.command,
1570 Commands::Resume { name } if name == "my-session"
1571 ));
1572 }
1573
1574 #[test]
1575 fn test_cli_parse_input() {
1576 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1577 assert!(matches!(
1578 &cli.command,
1579 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1580 ));
1581 }
1582
1583 #[test]
1584 fn test_cli_parse_input_no_text() {
1585 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1586 assert!(matches!(
1587 &cli.command,
1588 Commands::Input { name, text } if name == "my-session" && text.is_none()
1589 ));
1590 }
1591
1592 #[test]
1593 fn test_cli_parse_input_alias() {
1594 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1595 assert!(matches!(
1596 &cli.command,
1597 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1598 ));
1599 }
1600
1601 #[test]
1602 fn test_cli_parse_custom_node() {
1603 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1604 assert_eq!(cli.node, "win-pc:8080");
1605 }
1606
1607 #[test]
1608 fn test_cli_version() {
1609 let result = Cli::try_parse_from(["pulpo", "--version"]);
1610 let err = result.unwrap_err();
1612 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1613 }
1614
1615 #[test]
1616 fn test_cli_parse_no_subcommand_fails() {
1617 let result = Cli::try_parse_from(["pulpo"]);
1618 assert!(result.is_err());
1619 }
1620
1621 #[test]
1622 fn test_cli_debug() {
1623 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1624 let debug = format!("{cli:?}");
1625 assert!(debug.contains("List"));
1626 }
1627
1628 #[test]
1629 fn test_commands_debug() {
1630 let cmd = Commands::List;
1631 assert_eq!(format!("{cmd:?}"), "List");
1632 }
1633
1634 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"active","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1636
1637 fn test_create_response_json() -> String {
1639 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1640 }
1641
1642 async fn start_test_server() -> String {
1644 use axum::http::StatusCode;
1645 use axum::{
1646 Json, Router,
1647 routing::{get, post},
1648 };
1649
1650 let create_json = test_create_response_json();
1651
1652 let app = Router::new()
1653 .route(
1654 "/api/v1/sessions",
1655 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1656 (StatusCode::CREATED, create_json.clone())
1657 }),
1658 )
1659 .route(
1660 "/api/v1/sessions/{id}",
1661 get(|| async { TEST_SESSION_JSON.to_owned() })
1662 .delete(|| async { StatusCode::NO_CONTENT }),
1663 )
1664 .route(
1665 "/api/v1/sessions/{id}/kill",
1666 post(|| async { StatusCode::NO_CONTENT }),
1667 )
1668 .route(
1669 "/api/v1/sessions/{id}/output",
1670 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1671 )
1672 .route(
1673 "/api/v1/peers",
1674 get(|| async {
1675 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1676 }),
1677 )
1678 .route(
1679 "/api/v1/sessions/{id}/resume",
1680 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1681 )
1682 .route(
1683 "/api/v1/sessions/{id}/interventions",
1684 get(|| async { "[]".to_owned() }),
1685 )
1686 .route(
1687 "/api/v1/sessions/{id}/input",
1688 post(|| async { StatusCode::NO_CONTENT }),
1689 );
1690
1691 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1692 let addr = listener.local_addr().unwrap();
1693 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1694 format!("127.0.0.1:{}", addr.port())
1695 }
1696
1697 #[tokio::test]
1698 async fn test_execute_list_success() {
1699 let node = start_test_server().await;
1700 let cli = Cli {
1701 node,
1702 token: None,
1703 command: Commands::List,
1704 };
1705 let result = execute(&cli).await.unwrap();
1706 assert_eq!(result, "No sessions.");
1707 }
1708
1709 #[tokio::test]
1710 async fn test_execute_nodes_success() {
1711 let node = start_test_server().await;
1712 let cli = Cli {
1713 node,
1714 token: None,
1715 command: Commands::Nodes,
1716 };
1717 let result = execute(&cli).await.unwrap();
1718 assert!(result.contains("test"));
1719 assert!(result.contains("(local)"));
1720 assert!(result.contains("NAME"));
1721 }
1722
1723 #[tokio::test]
1724 async fn test_execute_spawn_success() {
1725 let node = start_test_server().await;
1726 let cli = Cli {
1727 node,
1728 token: None,
1729 command: Commands::Spawn {
1730 name: "test".into(),
1731 workdir: Some("/tmp/repo".into()),
1732 provider: Some("claude".into()),
1733 auto: false,
1734 unrestricted: false,
1735 model: None,
1736 system_prompt: None,
1737 allowed_tools: None,
1738 ink: None,
1739 max_turns: None,
1740 max_budget: None,
1741 output_format: None,
1742 worktree: false,
1743 conversation_id: None,
1744 detach: true,
1745 prompt: vec!["Fix".into(), "bug".into()],
1746 },
1747 };
1748 let result = execute(&cli).await.unwrap();
1749 assert!(result.contains("Created session"));
1750 assert!(result.contains("repo"));
1751 }
1752
1753 #[tokio::test]
1754 async fn test_execute_spawn_with_all_new_flags() {
1755 let node = start_test_server().await;
1756 let cli = Cli {
1757 node,
1758 token: None,
1759 command: Commands::Spawn {
1760 name: "test".into(),
1761 workdir: Some("/tmp/repo".into()),
1762 provider: Some("claude".into()),
1763 auto: false,
1764 unrestricted: false,
1765 model: Some("opus".into()),
1766 system_prompt: Some("Be helpful".into()),
1767 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1768 ink: Some("coder".into()),
1769 max_turns: Some(5),
1770 max_budget: Some(2.5),
1771 output_format: Some("json".into()),
1772 worktree: false,
1773 conversation_id: None,
1774 detach: true,
1775 prompt: vec!["Fix".into(), "bug".into()],
1776 },
1777 };
1778 let result = execute(&cli).await.unwrap();
1779 assert!(result.contains("Created session"));
1780 }
1781
1782 #[tokio::test]
1783 async fn test_execute_spawn_auto_mode() {
1784 let node = start_test_server().await;
1785 let cli = Cli {
1786 node,
1787 token: None,
1788 command: Commands::Spawn {
1789 name: "test".into(),
1790 workdir: Some("/tmp/repo".into()),
1791 provider: Some("claude".into()),
1792 auto: true,
1793 unrestricted: false,
1794 model: None,
1795 system_prompt: None,
1796 allowed_tools: None,
1797 ink: None,
1798 max_turns: None,
1799 max_budget: None,
1800 output_format: None,
1801 worktree: false,
1802 conversation_id: None,
1803 detach: true,
1804 prompt: vec!["Do it".into()],
1805 },
1806 };
1807 let result = execute(&cli).await.unwrap();
1808 assert!(result.contains("Created session"));
1809 }
1810
1811 #[tokio::test]
1812 async fn test_execute_spawn_with_name() {
1813 let node = start_test_server().await;
1814 let cli = Cli {
1815 node,
1816 token: None,
1817 command: Commands::Spawn {
1818 name: "my-task".into(),
1819 workdir: Some("/tmp/repo".into()),
1820 provider: Some("claude".into()),
1821 auto: false,
1822 unrestricted: false,
1823 model: None,
1824 system_prompt: None,
1825 allowed_tools: None,
1826 ink: None,
1827 max_turns: None,
1828 max_budget: None,
1829 output_format: None,
1830 worktree: false,
1831 conversation_id: None,
1832 detach: true,
1833 prompt: vec!["Fix".into(), "bug".into()],
1834 },
1835 };
1836 let result = execute(&cli).await.unwrap();
1837 assert!(result.contains("Created session"));
1838 }
1839
1840 #[tokio::test]
1841 async fn test_execute_spawn_auto_attach() {
1842 let node = start_test_server().await;
1843 let cli = Cli {
1844 node,
1845 token: None,
1846 command: Commands::Spawn {
1847 name: "test".into(),
1848 workdir: Some("/tmp/repo".into()),
1849 provider: Some("claude".into()),
1850 auto: false,
1851 unrestricted: false,
1852 model: None,
1853 system_prompt: None,
1854 allowed_tools: None,
1855 ink: None,
1856 max_turns: None,
1857 max_budget: None,
1858 output_format: None,
1859 worktree: false,
1860 conversation_id: None,
1861 detach: false,
1862 prompt: vec!["Fix".into(), "bug".into()],
1863 },
1864 };
1865 let result = execute(&cli).await.unwrap();
1866 assert!(result.contains("Detached from session"));
1868 }
1869
1870 #[tokio::test]
1871 async fn test_execute_kill_success() {
1872 let node = start_test_server().await;
1873 let cli = Cli {
1874 node,
1875 token: None,
1876 command: Commands::Kill {
1877 name: "test-session".into(),
1878 },
1879 };
1880 let result = execute(&cli).await.unwrap();
1881 assert!(result.contains("killed"));
1882 }
1883
1884 #[tokio::test]
1885 async fn test_execute_delete_success() {
1886 let node = start_test_server().await;
1887 let cli = Cli {
1888 node,
1889 token: None,
1890 command: Commands::Delete {
1891 name: "test-session".into(),
1892 },
1893 };
1894 let result = execute(&cli).await.unwrap();
1895 assert!(result.contains("deleted"));
1896 }
1897
1898 #[tokio::test]
1899 async fn test_execute_logs_success() {
1900 let node = start_test_server().await;
1901 let cli = Cli {
1902 node,
1903 token: None,
1904 command: Commands::Logs {
1905 name: "test-session".into(),
1906 lines: 50,
1907 follow: false,
1908 },
1909 };
1910 let result = execute(&cli).await.unwrap();
1911 assert!(result.contains("test output"));
1912 }
1913
1914 #[tokio::test]
1915 async fn test_execute_list_connection_refused() {
1916 let cli = Cli {
1917 node: "localhost:1".into(),
1918 token: None,
1919 command: Commands::List,
1920 };
1921 let result = execute(&cli).await;
1922 let err = result.unwrap_err().to_string();
1923 assert!(
1924 err.contains("Could not connect to pulpod"),
1925 "Expected friendly error, got: {err}"
1926 );
1927 assert!(err.contains("localhost:1"));
1928 }
1929
1930 #[tokio::test]
1931 async fn test_execute_nodes_connection_refused() {
1932 let cli = Cli {
1933 node: "localhost:1".into(),
1934 token: None,
1935 command: Commands::Nodes,
1936 };
1937 let result = execute(&cli).await;
1938 let err = result.unwrap_err().to_string();
1939 assert!(err.contains("Could not connect to pulpod"));
1940 }
1941
1942 #[tokio::test]
1943 async fn test_execute_kill_error_response() {
1944 use axum::{Router, http::StatusCode, routing::post};
1945
1946 let app = Router::new().route(
1947 "/api/v1/sessions/{id}/kill",
1948 post(|| async {
1949 (
1950 StatusCode::NOT_FOUND,
1951 "{\"error\":\"session not found: test-session\"}",
1952 )
1953 }),
1954 );
1955 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1956 let addr = listener.local_addr().unwrap();
1957 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1958 let node = format!("127.0.0.1:{}", addr.port());
1959
1960 let cli = Cli {
1961 node,
1962 token: None,
1963 command: Commands::Kill {
1964 name: "test-session".into(),
1965 },
1966 };
1967 let err = execute(&cli).await.unwrap_err();
1968 assert_eq!(err.to_string(), "session not found: test-session");
1969 }
1970
1971 #[tokio::test]
1972 async fn test_execute_delete_error_response() {
1973 use axum::{Router, http::StatusCode, routing::delete};
1974
1975 let app = Router::new().route(
1976 "/api/v1/sessions/{id}",
1977 delete(|| async {
1978 (
1979 StatusCode::CONFLICT,
1980 "{\"error\":\"cannot delete session in 'running' state\"}",
1981 )
1982 }),
1983 );
1984 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1985 let addr = listener.local_addr().unwrap();
1986 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1987 let node = format!("127.0.0.1:{}", addr.port());
1988
1989 let cli = Cli {
1990 node,
1991 token: None,
1992 command: Commands::Delete {
1993 name: "test-session".into(),
1994 },
1995 };
1996 let err = execute(&cli).await.unwrap_err();
1997 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1998 }
1999
2000 #[tokio::test]
2001 async fn test_execute_logs_error_response() {
2002 use axum::{Router, http::StatusCode, routing::get};
2003
2004 let app = Router::new().route(
2005 "/api/v1/sessions/{id}/output",
2006 get(|| async {
2007 (
2008 StatusCode::NOT_FOUND,
2009 "{\"error\":\"session not found: ghost\"}",
2010 )
2011 }),
2012 );
2013 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2014 let addr = listener.local_addr().unwrap();
2015 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2016 let node = format!("127.0.0.1:{}", addr.port());
2017
2018 let cli = Cli {
2019 node,
2020 token: None,
2021 command: Commands::Logs {
2022 name: "ghost".into(),
2023 lines: 50,
2024 follow: false,
2025 },
2026 };
2027 let err = execute(&cli).await.unwrap_err();
2028 assert_eq!(err.to_string(), "session not found: ghost");
2029 }
2030
2031 #[tokio::test]
2032 async fn test_execute_resume_error_response() {
2033 use axum::{Router, http::StatusCode, routing::post};
2034
2035 let app = Router::new().route(
2036 "/api/v1/sessions/{id}/resume",
2037 post(|| async {
2038 (
2039 StatusCode::BAD_REQUEST,
2040 "{\"error\":\"session is not lost (status: active)\"}",
2041 )
2042 }),
2043 );
2044 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2045 let addr = listener.local_addr().unwrap();
2046 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2047 let node = format!("127.0.0.1:{}", addr.port());
2048
2049 let cli = Cli {
2050 node,
2051 token: None,
2052 command: Commands::Resume {
2053 name: "test-session".into(),
2054 },
2055 };
2056 let err = execute(&cli).await.unwrap_err();
2057 assert_eq!(err.to_string(), "session is not lost (status: active)");
2058 }
2059
2060 #[tokio::test]
2061 async fn test_execute_spawn_error_response() {
2062 use axum::{Router, http::StatusCode, routing::post};
2063
2064 let app = Router::new().route(
2065 "/api/v1/sessions",
2066 post(|| async {
2067 (
2068 StatusCode::INTERNAL_SERVER_ERROR,
2069 "{\"error\":\"failed to spawn session\"}",
2070 )
2071 }),
2072 );
2073 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2074 let addr = listener.local_addr().unwrap();
2075 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2076 let node = format!("127.0.0.1:{}", addr.port());
2077
2078 let cli = Cli {
2079 node,
2080 token: None,
2081 command: Commands::Spawn {
2082 name: "test".into(),
2083 workdir: Some("/tmp/repo".into()),
2084 provider: Some("claude".into()),
2085 auto: false,
2086 unrestricted: false,
2087 model: None,
2088 system_prompt: None,
2089 allowed_tools: None,
2090 ink: None,
2091 max_turns: None,
2092 max_budget: None,
2093 output_format: None,
2094 worktree: false,
2095 conversation_id: None,
2096 detach: true,
2097 prompt: vec!["test".into()],
2098 },
2099 };
2100 let err = execute(&cli).await.unwrap_err();
2101 assert_eq!(err.to_string(), "failed to spawn session");
2102 }
2103
2104 #[tokio::test]
2105 async fn test_execute_interventions_error_response() {
2106 use axum::{Router, http::StatusCode, routing::get};
2107
2108 let app = Router::new().route(
2109 "/api/v1/sessions/{id}/interventions",
2110 get(|| async {
2111 (
2112 StatusCode::NOT_FOUND,
2113 "{\"error\":\"session not found: ghost\"}",
2114 )
2115 }),
2116 );
2117 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2118 let addr = listener.local_addr().unwrap();
2119 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2120 let node = format!("127.0.0.1:{}", addr.port());
2121
2122 let cli = Cli {
2123 node,
2124 token: None,
2125 command: Commands::Interventions {
2126 name: "ghost".into(),
2127 },
2128 };
2129 let err = execute(&cli).await.unwrap_err();
2130 assert_eq!(err.to_string(), "session not found: ghost");
2131 }
2132
2133 #[tokio::test]
2134 async fn test_execute_resume_success() {
2135 let node = start_test_server().await;
2136 let cli = Cli {
2137 node,
2138 token: None,
2139 command: Commands::Resume {
2140 name: "test-session".into(),
2141 },
2142 };
2143 let result = execute(&cli).await.unwrap();
2144 assert!(result.contains("Detached from session"));
2145 }
2146
2147 #[tokio::test]
2148 async fn test_execute_input_success() {
2149 let node = start_test_server().await;
2150 let cli = Cli {
2151 node,
2152 token: None,
2153 command: Commands::Input {
2154 name: "test-session".into(),
2155 text: Some("yes".into()),
2156 },
2157 };
2158 let result = execute(&cli).await.unwrap();
2159 assert!(result.contains("Sent input to session test-session"));
2160 }
2161
2162 #[tokio::test]
2163 async fn test_execute_input_no_text() {
2164 let node = start_test_server().await;
2165 let cli = Cli {
2166 node,
2167 token: None,
2168 command: Commands::Input {
2169 name: "test-session".into(),
2170 text: None,
2171 },
2172 };
2173 let result = execute(&cli).await.unwrap();
2174 assert!(result.contains("Sent input to session test-session"));
2175 }
2176
2177 #[tokio::test]
2178 async fn test_execute_input_connection_refused() {
2179 let cli = Cli {
2180 node: "localhost:1".into(),
2181 token: None,
2182 command: Commands::Input {
2183 name: "test".into(),
2184 text: Some("y".into()),
2185 },
2186 };
2187 let result = execute(&cli).await;
2188 let err = result.unwrap_err().to_string();
2189 assert!(err.contains("Could not connect to pulpod"));
2190 }
2191
2192 #[tokio::test]
2193 async fn test_execute_input_error_response() {
2194 use axum::{Router, http::StatusCode, routing::post};
2195
2196 let app = Router::new().route(
2197 "/api/v1/sessions/{id}/input",
2198 post(|| async {
2199 (
2200 StatusCode::NOT_FOUND,
2201 "{\"error\":\"session not found: ghost\"}",
2202 )
2203 }),
2204 );
2205 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2206 let addr = listener.local_addr().unwrap();
2207 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2208 let node = format!("127.0.0.1:{}", addr.port());
2209
2210 let cli = Cli {
2211 node,
2212 token: None,
2213 command: Commands::Input {
2214 name: "ghost".into(),
2215 text: Some("y".into()),
2216 },
2217 };
2218 let err = execute(&cli).await.unwrap_err();
2219 assert_eq!(err.to_string(), "session not found: ghost");
2220 }
2221
2222 #[tokio::test]
2223 async fn test_execute_ui() {
2224 let cli = Cli {
2225 node: "localhost:7433".into(),
2226 token: None,
2227 command: Commands::Ui,
2228 };
2229 let result = execute(&cli).await.unwrap();
2230 assert!(result.contains("Opening"));
2231 assert!(result.contains("http://localhost:7433"));
2232 }
2233
2234 #[tokio::test]
2235 async fn test_execute_ui_custom_node() {
2236 let cli = Cli {
2237 node: "mac-mini:7433".into(),
2238 token: None,
2239 command: Commands::Ui,
2240 };
2241 let result = execute(&cli).await.unwrap();
2242 assert!(result.contains("http://mac-mini:7433"));
2243 }
2244
2245 #[test]
2246 fn test_format_sessions_empty() {
2247 assert_eq!(format_sessions(&[]), "No sessions.");
2248 }
2249
2250 #[test]
2251 fn test_format_sessions_with_data() {
2252 use chrono::Utc;
2253 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2254 use uuid::Uuid;
2255
2256 let sessions = vec![Session {
2257 id: Uuid::nil(),
2258 name: "my-api".into(),
2259 workdir: "/tmp/repo".into(),
2260 provider: Provider::Claude,
2261 prompt: "Fix the bug".into(),
2262 status: SessionStatus::Active,
2263 mode: SessionMode::Interactive,
2264 conversation_id: None,
2265 exit_code: None,
2266 backend_session_id: None,
2267 output_snapshot: None,
2268 guard_config: None,
2269 model: None,
2270 allowed_tools: None,
2271 system_prompt: None,
2272 metadata: None,
2273 ink: None,
2274 max_turns: None,
2275 max_budget_usd: None,
2276 output_format: None,
2277 intervention_code: None,
2278 intervention_reason: None,
2279 intervention_at: None,
2280 last_output_at: None,
2281 idle_since: None,
2282 created_at: Utc::now(),
2283 updated_at: Utc::now(),
2284 }];
2285 let output = format_sessions(&sessions);
2286 assert!(output.contains("NAME"));
2287 assert!(output.contains("my-api"));
2288 assert!(output.contains("active"));
2289 assert!(output.contains("claude"));
2290 assert!(output.contains("Fix the bug"));
2291 }
2292
2293 #[test]
2294 fn test_format_sessions_long_prompt_truncated() {
2295 use chrono::Utc;
2296 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2297 use uuid::Uuid;
2298
2299 let sessions = vec![Session {
2300 id: Uuid::nil(),
2301 name: "test".into(),
2302 workdir: "/tmp".into(),
2303 provider: Provider::Codex,
2304 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2305 status: SessionStatus::Finished,
2306 mode: SessionMode::Autonomous,
2307 conversation_id: None,
2308 exit_code: None,
2309 backend_session_id: None,
2310 output_snapshot: None,
2311 guard_config: None,
2312 model: None,
2313 allowed_tools: None,
2314 system_prompt: None,
2315 metadata: None,
2316 ink: None,
2317 max_turns: None,
2318 max_budget_usd: None,
2319 output_format: None,
2320 intervention_code: None,
2321 intervention_reason: None,
2322 intervention_at: None,
2323 last_output_at: None,
2324 idle_since: None,
2325 created_at: Utc::now(),
2326 updated_at: Utc::now(),
2327 }];
2328 let output = format_sessions(&sessions);
2329 assert!(output.contains("..."));
2330 }
2331
2332 #[test]
2333 fn test_format_nodes() {
2334 use pulpo_common::node::NodeInfo;
2335 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2336
2337 let resp = PeersResponse {
2338 local: NodeInfo {
2339 name: "mac-mini".into(),
2340 hostname: "h".into(),
2341 os: "macos".into(),
2342 arch: "arm64".into(),
2343 cpus: 8,
2344 memory_mb: 16384,
2345 gpu: None,
2346 },
2347 peers: vec![PeerInfo {
2348 name: "win-pc".into(),
2349 address: "win-pc:7433".into(),
2350 status: PeerStatus::Online,
2351 node_info: None,
2352 session_count: Some(3),
2353 source: PeerSource::Configured,
2354 }],
2355 };
2356 let output = format_nodes(&resp);
2357 assert!(output.contains("mac-mini"));
2358 assert!(output.contains("(local)"));
2359 assert!(output.contains("win-pc"));
2360 assert!(output.contains('3'));
2361 }
2362
2363 #[test]
2364 fn test_format_nodes_no_session_count() {
2365 use pulpo_common::node::NodeInfo;
2366 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2367
2368 let resp = PeersResponse {
2369 local: NodeInfo {
2370 name: "local".into(),
2371 hostname: "h".into(),
2372 os: "linux".into(),
2373 arch: "x86_64".into(),
2374 cpus: 4,
2375 memory_mb: 8192,
2376 gpu: None,
2377 },
2378 peers: vec![PeerInfo {
2379 name: "peer".into(),
2380 address: "peer:7433".into(),
2381 status: PeerStatus::Offline,
2382 node_info: None,
2383 session_count: None,
2384 source: PeerSource::Configured,
2385 }],
2386 };
2387 let output = format_nodes(&resp);
2388 assert!(output.contains("offline"));
2389 let lines: Vec<&str> = output.lines().collect();
2391 assert!(lines[2].contains('-'));
2392 }
2393
2394 #[tokio::test]
2395 async fn test_execute_resume_connection_refused() {
2396 let cli = Cli {
2397 node: "localhost:1".into(),
2398 token: None,
2399 command: Commands::Resume {
2400 name: "test".into(),
2401 },
2402 };
2403 let result = execute(&cli).await;
2404 let err = result.unwrap_err().to_string();
2405 assert!(err.contains("Could not connect to pulpod"));
2406 }
2407
2408 #[tokio::test]
2409 async fn test_execute_spawn_connection_refused() {
2410 let cli = Cli {
2411 node: "localhost:1".into(),
2412 token: None,
2413 command: Commands::Spawn {
2414 name: "test".into(),
2415 workdir: Some("/tmp".into()),
2416 provider: Some("claude".into()),
2417 auto: false,
2418 unrestricted: false,
2419 model: None,
2420 system_prompt: None,
2421 allowed_tools: None,
2422 ink: None,
2423 max_turns: None,
2424 max_budget: None,
2425 output_format: None,
2426 worktree: false,
2427 conversation_id: None,
2428 detach: true,
2429 prompt: vec!["test".into()],
2430 },
2431 };
2432 let result = execute(&cli).await;
2433 let err = result.unwrap_err().to_string();
2434 assert!(err.contains("Could not connect to pulpod"));
2435 }
2436
2437 #[tokio::test]
2438 async fn test_execute_kill_connection_refused() {
2439 let cli = Cli {
2440 node: "localhost:1".into(),
2441 token: None,
2442 command: Commands::Kill {
2443 name: "test".into(),
2444 },
2445 };
2446 let result = execute(&cli).await;
2447 let err = result.unwrap_err().to_string();
2448 assert!(err.contains("Could not connect to pulpod"));
2449 }
2450
2451 #[tokio::test]
2452 async fn test_execute_delete_connection_refused() {
2453 let cli = Cli {
2454 node: "localhost:1".into(),
2455 token: None,
2456 command: Commands::Delete {
2457 name: "test".into(),
2458 },
2459 };
2460 let result = execute(&cli).await;
2461 let err = result.unwrap_err().to_string();
2462 assert!(err.contains("Could not connect to pulpod"));
2463 }
2464
2465 #[tokio::test]
2466 async fn test_execute_logs_connection_refused() {
2467 let cli = Cli {
2468 node: "localhost:1".into(),
2469 token: None,
2470 command: Commands::Logs {
2471 name: "test".into(),
2472 lines: 50,
2473 follow: false,
2474 },
2475 };
2476 let result = execute(&cli).await;
2477 let err = result.unwrap_err().to_string();
2478 assert!(err.contains("Could not connect to pulpod"));
2479 }
2480
2481 #[tokio::test]
2482 async fn test_friendly_error_connect() {
2483 let err = reqwest::Client::new()
2485 .get("http://127.0.0.1:1")
2486 .send()
2487 .await
2488 .unwrap_err();
2489 let friendly = friendly_error(&err, "test-node:1");
2490 let msg = friendly.to_string();
2491 assert!(
2492 msg.contains("Could not connect"),
2493 "Expected connect message, got: {msg}"
2494 );
2495 }
2496
2497 #[tokio::test]
2498 async fn test_friendly_error_other() {
2499 let err = reqwest::Client::new()
2501 .get("http://[::invalid::url")
2502 .send()
2503 .await
2504 .unwrap_err();
2505 let friendly = friendly_error(&err, "bad-host");
2506 let msg = friendly.to_string();
2507 assert!(
2508 msg.contains("Network error"),
2509 "Expected network error message, got: {msg}"
2510 );
2511 assert!(msg.contains("bad-host"));
2512 }
2513
2514 #[test]
2517 fn test_is_localhost_variants() {
2518 assert!(is_localhost("localhost:7433"));
2519 assert!(is_localhost("127.0.0.1:7433"));
2520 assert!(is_localhost("[::1]:7433"));
2521 assert!(is_localhost("::1"));
2522 assert!(is_localhost("localhost"));
2523 assert!(!is_localhost("mac-mini:7433"));
2524 assert!(!is_localhost("192.168.1.100:7433"));
2525 }
2526
2527 #[test]
2528 fn test_authed_get_with_token() {
2529 let client = reqwest::Client::new();
2530 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2531 .build()
2532 .unwrap();
2533 let auth = req
2534 .headers()
2535 .get("authorization")
2536 .unwrap()
2537 .to_str()
2538 .unwrap();
2539 assert_eq!(auth, "Bearer tok");
2540 }
2541
2542 #[test]
2543 fn test_authed_get_without_token() {
2544 let client = reqwest::Client::new();
2545 let req = authed_get(&client, "http://h:1/api".into(), None)
2546 .build()
2547 .unwrap();
2548 assert!(req.headers().get("authorization").is_none());
2549 }
2550
2551 #[test]
2552 fn test_authed_post_with_token() {
2553 let client = reqwest::Client::new();
2554 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2555 .build()
2556 .unwrap();
2557 let auth = req
2558 .headers()
2559 .get("authorization")
2560 .unwrap()
2561 .to_str()
2562 .unwrap();
2563 assert_eq!(auth, "Bearer secret");
2564 }
2565
2566 #[test]
2567 fn test_authed_post_without_token() {
2568 let client = reqwest::Client::new();
2569 let req = authed_post(&client, "http://h:1/api".into(), None)
2570 .build()
2571 .unwrap();
2572 assert!(req.headers().get("authorization").is_none());
2573 }
2574
2575 #[test]
2576 fn test_authed_delete_with_token() {
2577 let client = reqwest::Client::new();
2578 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2579 .build()
2580 .unwrap();
2581 let auth = req
2582 .headers()
2583 .get("authorization")
2584 .unwrap()
2585 .to_str()
2586 .unwrap();
2587 assert_eq!(auth, "Bearer del-tok");
2588 }
2589
2590 #[test]
2591 fn test_authed_delete_without_token() {
2592 let client = reqwest::Client::new();
2593 let req = authed_delete(&client, "http://h:1/api".into(), None)
2594 .build()
2595 .unwrap();
2596 assert!(req.headers().get("authorization").is_none());
2597 }
2598
2599 #[tokio::test]
2600 async fn test_resolve_token_explicit() {
2601 let client = reqwest::Client::new();
2602 let token =
2603 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2604 assert_eq!(token, Some("my-tok".into()));
2605 }
2606
2607 #[tokio::test]
2608 async fn test_resolve_token_remote_no_explicit() {
2609 let client = reqwest::Client::new();
2610 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2611 assert_eq!(token, None);
2612 }
2613
2614 #[tokio::test]
2615 async fn test_resolve_token_localhost_auto_discover() {
2616 use axum::{Json, Router, routing::get};
2617
2618 let app = Router::new().route(
2619 "/api/v1/auth/token",
2620 get(|| async {
2621 Json(AuthTokenResponse {
2622 token: "discovered".into(),
2623 })
2624 }),
2625 );
2626 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2627 let addr = listener.local_addr().unwrap();
2628 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2629
2630 let node = format!("localhost:{}", addr.port());
2631 let base = base_url(&node);
2632 let client = reqwest::Client::new();
2633 let token = resolve_token(&client, &base, &node, None).await;
2634 assert_eq!(token, Some("discovered".into()));
2635 }
2636
2637 #[tokio::test]
2638 async fn test_discover_token_empty_returns_none() {
2639 use axum::{Json, Router, routing::get};
2640
2641 let app = Router::new().route(
2642 "/api/v1/auth/token",
2643 get(|| async {
2644 Json(AuthTokenResponse {
2645 token: String::new(),
2646 })
2647 }),
2648 );
2649 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2650 let addr = listener.local_addr().unwrap();
2651 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2652
2653 let base = format!("http://127.0.0.1:{}", addr.port());
2654 let client = reqwest::Client::new();
2655 assert_eq!(discover_token(&client, &base).await, None);
2656 }
2657
2658 #[tokio::test]
2659 async fn test_discover_token_unreachable_returns_none() {
2660 let client = reqwest::Client::new();
2661 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2662 }
2663
2664 #[test]
2665 fn test_cli_parse_with_token() {
2666 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2667 assert_eq!(cli.token, Some("my-secret".into()));
2668 }
2669
2670 #[test]
2671 fn test_cli_parse_without_token() {
2672 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2673 assert_eq!(cli.token, None);
2674 }
2675
2676 #[tokio::test]
2677 async fn test_execute_with_explicit_token_sends_header() {
2678 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2679
2680 let app = Router::new().route(
2681 "/api/v1/sessions",
2682 get(|req: Request| async move {
2683 let auth = req
2684 .headers()
2685 .get("authorization")
2686 .and_then(|v| v.to_str().ok())
2687 .unwrap_or("");
2688 assert_eq!(auth, "Bearer test-token");
2689 (StatusCode::OK, "[]".to_owned())
2690 }),
2691 );
2692 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2693 let addr = listener.local_addr().unwrap();
2694 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2695 let node = format!("127.0.0.1:{}", addr.port());
2696
2697 let cli = Cli {
2698 node,
2699 token: Some("test-token".into()),
2700 command: Commands::List,
2701 };
2702 let result = execute(&cli).await.unwrap();
2703 assert_eq!(result, "No sessions.");
2704 }
2705
2706 #[test]
2709 fn test_cli_parse_interventions() {
2710 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2711 assert!(matches!(
2712 &cli.command,
2713 Commands::Interventions { name } if name == "my-session"
2714 ));
2715 }
2716
2717 #[test]
2718 fn test_format_interventions_empty() {
2719 assert_eq!(format_interventions(&[]), "No intervention events.");
2720 }
2721
2722 #[test]
2723 fn test_format_interventions_with_data() {
2724 let events = vec![
2725 InterventionEventResponse {
2726 id: 1,
2727 session_id: "sess-1".into(),
2728 code: None,
2729 reason: "Memory exceeded threshold".into(),
2730 created_at: "2026-01-01T00:00:00Z".into(),
2731 },
2732 InterventionEventResponse {
2733 id: 2,
2734 session_id: "sess-1".into(),
2735 code: None,
2736 reason: "Idle for 10 minutes".into(),
2737 created_at: "2026-01-02T00:00:00Z".into(),
2738 },
2739 ];
2740 let output = format_interventions(&events);
2741 assert!(output.contains("ID"));
2742 assert!(output.contains("TIMESTAMP"));
2743 assert!(output.contains("REASON"));
2744 assert!(output.contains("Memory exceeded threshold"));
2745 assert!(output.contains("Idle for 10 minutes"));
2746 assert!(output.contains("2026-01-01T00:00:00Z"));
2747 }
2748
2749 #[tokio::test]
2750 async fn test_execute_interventions_empty() {
2751 let node = start_test_server().await;
2752 let cli = Cli {
2753 node,
2754 token: None,
2755 command: Commands::Interventions {
2756 name: "my-session".into(),
2757 },
2758 };
2759 let result = execute(&cli).await.unwrap();
2760 assert_eq!(result, "No intervention events.");
2761 }
2762
2763 #[tokio::test]
2764 async fn test_execute_interventions_with_data() {
2765 use axum::{Router, routing::get};
2766
2767 let app = Router::new().route(
2768 "/api/v1/sessions/{id}/interventions",
2769 get(|| async {
2770 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2771 .to_owned()
2772 }),
2773 );
2774 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2775 let addr = listener.local_addr().unwrap();
2776 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2777 let node = format!("127.0.0.1:{}", addr.port());
2778
2779 let cli = Cli {
2780 node,
2781 token: None,
2782 command: Commands::Interventions {
2783 name: "test".into(),
2784 },
2785 };
2786 let result = execute(&cli).await.unwrap();
2787 assert!(result.contains("OOM"));
2788 assert!(result.contains("2026-01-01T00:00:00Z"));
2789 }
2790
2791 #[tokio::test]
2792 async fn test_execute_interventions_connection_refused() {
2793 let cli = Cli {
2794 node: "localhost:1".into(),
2795 token: None,
2796 command: Commands::Interventions {
2797 name: "test".into(),
2798 },
2799 };
2800 let result = execute(&cli).await;
2801 let err = result.unwrap_err().to_string();
2802 assert!(err.contains("Could not connect to pulpod"));
2803 }
2804
2805 #[test]
2808 fn test_build_attach_command() {
2809 let cmd = build_attach_command("my-session");
2810 assert_eq!(cmd.get_program(), "tmux");
2811 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2812 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2813 }
2814
2815 #[test]
2816 fn test_cli_parse_attach() {
2817 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2818 assert!(matches!(
2819 &cli.command,
2820 Commands::Attach { name } if name == "my-session"
2821 ));
2822 }
2823
2824 #[test]
2825 fn test_cli_parse_attach_alias() {
2826 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2827 assert!(matches!(
2828 &cli.command,
2829 Commands::Attach { name } if name == "my-session"
2830 ));
2831 }
2832
2833 #[tokio::test]
2834 async fn test_execute_attach_success() {
2835 let node = start_test_server().await;
2836 let cli = Cli {
2837 node,
2838 token: None,
2839 command: Commands::Attach {
2840 name: "test-session".into(),
2841 },
2842 };
2843 let result = execute(&cli).await.unwrap();
2844 assert!(result.contains("Detached from session test-session"));
2845 }
2846
2847 #[tokio::test]
2848 async fn test_execute_attach_with_backend_session_id() {
2849 use axum::{Router, routing::get};
2850 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"active","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2851 let app = Router::new().route(
2852 "/api/v1/sessions/{id}",
2853 get(move || async move { session_json.to_owned() }),
2854 );
2855 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2856 let addr = listener.local_addr().unwrap();
2857 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2858
2859 let cli = Cli {
2860 node: format!("127.0.0.1:{}", addr.port()),
2861 token: None,
2862 command: Commands::Attach {
2863 name: "my-session".into(),
2864 },
2865 };
2866 let result = execute(&cli).await.unwrap();
2867 assert!(result.contains("Detached from session my-session"));
2868 }
2869
2870 #[tokio::test]
2871 async fn test_execute_attach_connection_refused() {
2872 let cli = Cli {
2873 node: "localhost:1".into(),
2874 token: None,
2875 command: Commands::Attach {
2876 name: "test-session".into(),
2877 },
2878 };
2879 let result = execute(&cli).await;
2880 let err = result.unwrap_err().to_string();
2881 assert!(err.contains("Could not connect to pulpod"));
2882 }
2883
2884 #[tokio::test]
2885 async fn test_execute_attach_error_response() {
2886 use axum::{Router, http::StatusCode, routing::get};
2887 let app = Router::new().route(
2888 "/api/v1/sessions/{id}",
2889 get(|| async {
2890 (
2891 StatusCode::NOT_FOUND,
2892 r#"{"error":"session not found"}"#.to_owned(),
2893 )
2894 }),
2895 );
2896 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2897 let addr = listener.local_addr().unwrap();
2898 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2899
2900 let cli = Cli {
2901 node: format!("127.0.0.1:{}", addr.port()),
2902 token: None,
2903 command: Commands::Attach {
2904 name: "nonexistent".into(),
2905 },
2906 };
2907 let result = execute(&cli).await;
2908 let err = result.unwrap_err().to_string();
2909 assert!(err.contains("session not found"));
2910 }
2911
2912 #[tokio::test]
2913 async fn test_execute_attach_stale_session() {
2914 use axum::{Router, routing::get};
2915 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"lost","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2916 let app = Router::new().route(
2917 "/api/v1/sessions/{id}",
2918 get(move || async move { session_json.to_owned() }),
2919 );
2920 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2921 let addr = listener.local_addr().unwrap();
2922 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2923
2924 let cli = Cli {
2925 node: format!("127.0.0.1:{}", addr.port()),
2926 token: None,
2927 command: Commands::Attach {
2928 name: "stale-sess".into(),
2929 },
2930 };
2931 let result = execute(&cli).await;
2932 let err = result.unwrap_err().to_string();
2933 assert!(err.contains("lost"));
2934 assert!(err.contains("pulpo resume"));
2935 }
2936
2937 #[tokio::test]
2938 async fn test_execute_attach_dead_session() {
2939 use axum::{Router, routing::get};
2940 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"killed","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2941 let app = Router::new().route(
2942 "/api/v1/sessions/{id}",
2943 get(move || async move { session_json.to_owned() }),
2944 );
2945 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2946 let addr = listener.local_addr().unwrap();
2947 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2948
2949 let cli = Cli {
2950 node: format!("127.0.0.1:{}", addr.port()),
2951 token: None,
2952 command: Commands::Attach {
2953 name: "dead-sess".into(),
2954 },
2955 };
2956 let result = execute(&cli).await;
2957 let err = result.unwrap_err().to_string();
2958 assert!(err.contains("killed"));
2959 assert!(err.contains("cannot attach"));
2960 }
2961
2962 #[test]
2965 fn test_cli_parse_alias_spawn() {
2966 let cli = Cli::try_parse_from(["pulpo", "s", "my-task", "Do it"]).unwrap();
2967 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2968 }
2969
2970 #[test]
2971 fn test_cli_parse_alias_list() {
2972 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2973 assert!(matches!(cli.command, Commands::List));
2974 }
2975
2976 #[test]
2977 fn test_cli_parse_alias_logs() {
2978 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2979 assert!(matches!(
2980 &cli.command,
2981 Commands::Logs { name, .. } if name == "my-session"
2982 ));
2983 }
2984
2985 #[test]
2986 fn test_cli_parse_alias_kill() {
2987 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2988 assert!(matches!(
2989 &cli.command,
2990 Commands::Kill { name } if name == "my-session"
2991 ));
2992 }
2993
2994 #[test]
2995 fn test_cli_parse_alias_delete() {
2996 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2997 assert!(matches!(
2998 &cli.command,
2999 Commands::Delete { name } if name == "my-session"
3000 ));
3001 }
3002
3003 #[test]
3004 fn test_cli_parse_alias_resume() {
3005 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
3006 assert!(matches!(
3007 &cli.command,
3008 Commands::Resume { name } if name == "my-session"
3009 ));
3010 }
3011
3012 #[test]
3013 fn test_cli_parse_alias_nodes() {
3014 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
3015 assert!(matches!(cli.command, Commands::Nodes));
3016 }
3017
3018 #[test]
3019 fn test_cli_parse_alias_interventions() {
3020 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
3021 assert!(matches!(
3022 &cli.command,
3023 Commands::Interventions { name } if name == "my-session"
3024 ));
3025 }
3026
3027 #[test]
3028 fn test_api_error_json() {
3029 let err = api_error("{\"error\":\"session not found: foo\"}");
3030 assert_eq!(err.to_string(), "session not found: foo");
3031 }
3032
3033 #[test]
3034 fn test_api_error_plain_text() {
3035 let err = api_error("plain text error");
3036 assert_eq!(err.to_string(), "plain text error");
3037 }
3038
3039 #[test]
3042 fn test_diff_output_empty_prev() {
3043 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
3044 }
3045
3046 #[test]
3047 fn test_diff_output_identical() {
3048 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
3049 }
3050
3051 #[test]
3052 fn test_diff_output_new_lines_appended() {
3053 let prev = "line1\nline2";
3054 let new = "line1\nline2\nline3\nline4";
3055 assert_eq!(diff_output(prev, new), "line3\nline4");
3056 }
3057
3058 #[test]
3059 fn test_diff_output_scrolled_window() {
3060 let prev = "line1\nline2\nline3";
3062 let new = "line2\nline3\nline4";
3063 assert_eq!(diff_output(prev, new), "line4");
3064 }
3065
3066 #[test]
3067 fn test_diff_output_completely_different() {
3068 let prev = "aaa\nbbb";
3069 let new = "xxx\nyyy";
3070 assert_eq!(diff_output(prev, new), "xxx\nyyy");
3071 }
3072
3073 #[test]
3074 fn test_diff_output_last_line_matches_but_overlap_fails() {
3075 let prev = "aaa\ncommon";
3077 let new = "zzz\ncommon\nnew_line";
3078 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
3082 }
3083
3084 #[test]
3085 fn test_diff_output_new_empty() {
3086 assert_eq!(diff_output("line1", ""), "");
3087 }
3088
3089 async fn start_follow_test_server() -> String {
3093 use axum::{Router, extract::Path, extract::Query, routing::get};
3094 use std::sync::Arc;
3095 use std::sync::atomic::{AtomicUsize, Ordering};
3096
3097 let call_count = Arc::new(AtomicUsize::new(0));
3098 let output_count = call_count.clone();
3099 let status_count = Arc::new(AtomicUsize::new(0));
3100 let status_count_inner = status_count.clone();
3101
3102 let app = Router::new()
3103 .route(
3104 "/api/v1/sessions/{id}/output",
3105 get(
3106 move |_path: Path<String>,
3107 _query: Query<std::collections::HashMap<String, String>>| {
3108 let count = output_count.clone();
3109 async move {
3110 let n = count.fetch_add(1, Ordering::SeqCst);
3111 let output = match n {
3112 0 => "line1\nline2".to_owned(),
3113 1 => "line1\nline2\nline3".to_owned(),
3114 _ => "line2\nline3\nline4".to_owned(),
3115 };
3116 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
3117 }
3118 },
3119 ),
3120 )
3121 .route(
3122 "/api/v1/sessions/{id}",
3123 get(move |_path: Path<String>| {
3124 let count = status_count_inner.clone();
3125 async move {
3126 let n = count.fetch_add(1, Ordering::SeqCst);
3127 let status = if n < 2 { "active" } else { "finished" };
3128 format!(
3129 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
3130 )
3131 }
3132 }),
3133 );
3134
3135 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3136 let addr = listener.local_addr().unwrap();
3137 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3138 format!("http://127.0.0.1:{}", addr.port())
3139 }
3140
3141 #[tokio::test]
3142 async fn test_follow_logs_polls_and_exits_on_completed() {
3143 let base = start_follow_test_server().await;
3144 let client = reqwest::Client::new();
3145 let mut buf = Vec::new();
3146
3147 follow_logs(&client, &base, "test", 100, None, &mut buf)
3148 .await
3149 .unwrap();
3150
3151 let output = String::from_utf8(buf).unwrap();
3152 assert!(output.contains("line1"));
3154 assert!(output.contains("line2"));
3155 assert!(output.contains("line3"));
3156 assert!(output.contains("line4"));
3157 }
3158
3159 #[tokio::test]
3160 async fn test_execute_logs_follow_success() {
3161 let base = start_follow_test_server().await;
3162 let node = base.strip_prefix("http://").unwrap().to_owned();
3164
3165 let cli = Cli {
3166 node,
3167 token: None,
3168 command: Commands::Logs {
3169 name: "test".into(),
3170 lines: 100,
3171 follow: true,
3172 },
3173 };
3174 let result = execute(&cli).await.unwrap();
3176 assert_eq!(result, "");
3177 }
3178
3179 #[tokio::test]
3180 async fn test_execute_logs_follow_connection_refused() {
3181 let cli = Cli {
3182 node: "localhost:1".into(),
3183 token: None,
3184 command: Commands::Logs {
3185 name: "test".into(),
3186 lines: 50,
3187 follow: true,
3188 },
3189 };
3190 let result = execute(&cli).await;
3191 let err = result.unwrap_err().to_string();
3192 assert!(
3193 err.contains("Could not connect to pulpod"),
3194 "Expected friendly error, got: {err}"
3195 );
3196 }
3197
3198 #[tokio::test]
3199 async fn test_follow_logs_exits_on_dead() {
3200 use axum::{Router, extract::Path, extract::Query, routing::get};
3201
3202 let app = Router::new()
3203 .route(
3204 "/api/v1/sessions/{id}/output",
3205 get(
3206 |_path: Path<String>,
3207 _query: Query<std::collections::HashMap<String, String>>| async {
3208 r#"{"output":"some output"}"#.to_owned()
3209 },
3210 ),
3211 )
3212 .route(
3213 "/api/v1/sessions/{id}",
3214 get(|_path: Path<String>| async {
3215 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"killed","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3216 }),
3217 );
3218
3219 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3220 let addr = listener.local_addr().unwrap();
3221 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3222 let base = format!("http://127.0.0.1:{}", addr.port());
3223
3224 let client = reqwest::Client::new();
3225 let mut buf = Vec::new();
3226 follow_logs(&client, &base, "test", 100, None, &mut buf)
3227 .await
3228 .unwrap();
3229
3230 let output = String::from_utf8(buf).unwrap();
3231 assert!(output.contains("some output"));
3232 }
3233
3234 #[tokio::test]
3235 async fn test_follow_logs_exits_on_stale() {
3236 use axum::{Router, extract::Path, extract::Query, routing::get};
3237
3238 let app = Router::new()
3239 .route(
3240 "/api/v1/sessions/{id}/output",
3241 get(
3242 |_path: Path<String>,
3243 _query: Query<std::collections::HashMap<String, String>>| async {
3244 r#"{"output":"stale output"}"#.to_owned()
3245 },
3246 ),
3247 )
3248 .route(
3249 "/api/v1/sessions/{id}",
3250 get(|_path: Path<String>| async {
3251 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"lost","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3252 }),
3253 );
3254
3255 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3256 let addr = listener.local_addr().unwrap();
3257 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3258 let base = format!("http://127.0.0.1:{}", addr.port());
3259
3260 let client = reqwest::Client::new();
3261 let mut buf = Vec::new();
3262 follow_logs(&client, &base, "test", 100, None, &mut buf)
3263 .await
3264 .unwrap();
3265
3266 let output = String::from_utf8(buf).unwrap();
3267 assert!(output.contains("stale output"));
3268 }
3269
3270 #[tokio::test]
3271 async fn test_execute_logs_follow_non_reqwest_error() {
3272 use axum::{Router, extract::Path, extract::Query, routing::get};
3273
3274 let app = Router::new()
3276 .route(
3277 "/api/v1/sessions/{id}/output",
3278 get(
3279 |_path: Path<String>,
3280 _query: Query<std::collections::HashMap<String, String>>| async {
3281 r#"{"output":"initial"}"#.to_owned()
3282 },
3283 ),
3284 )
3285 .route(
3286 "/api/v1/sessions/{id}",
3287 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3288 );
3289
3290 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3291 let addr = listener.local_addr().unwrap();
3292 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3293 let node = format!("127.0.0.1:{}", addr.port());
3294
3295 let cli = Cli {
3296 node,
3297 token: None,
3298 command: Commands::Logs {
3299 name: "test".into(),
3300 lines: 100,
3301 follow: true,
3302 },
3303 };
3304 let err = execute(&cli).await.unwrap_err();
3305 let msg = err.to_string();
3307 assert!(
3308 msg.contains("expected ident"),
3309 "Expected serde parse error, got: {msg}"
3310 );
3311 }
3312
3313 #[test]
3314 fn test_cli_parse_spawn_with_guardrails() {
3315 let cli = Cli::try_parse_from([
3316 "pulpo",
3317 "spawn",
3318 "my-task",
3319 "--max-turns",
3320 "10",
3321 "--max-budget",
3322 "5.5",
3323 "--output-format",
3324 "json",
3325 "Do it",
3326 ])
3327 .unwrap();
3328 assert!(matches!(
3329 &cli.command,
3330 Commands::Spawn { max_turns, max_budget, output_format, .. }
3331 if *max_turns == Some(10) && *max_budget == Some(5.5)
3332 && output_format.as_deref() == Some("json")
3333 ));
3334 }
3335
3336 #[tokio::test]
3337 async fn test_fetch_session_status_connection_error() {
3338 let client = reqwest::Client::new();
3339 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3340 assert!(result.is_err());
3341 }
3342
3343 #[test]
3346 fn test_build_crontab_line() {
3347 let line = build_crontab_line(
3348 "nightly-review",
3349 "0 3 * * *",
3350 "/home/me/repo",
3351 "claude",
3352 "Review PRs",
3353 "localhost:7433",
3354 );
3355 assert_eq!(
3356 line,
3357 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3358 );
3359 }
3360
3361 #[test]
3362 fn test_crontab_install_success() {
3363 let crontab = "# existing cron\n0 * * * * echo hi\n";
3364 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3365 let result = crontab_install(crontab, "my-job", line).unwrap();
3366 assert!(result.starts_with("# existing cron\n"));
3367 assert!(result.ends_with("#pulpo:my-job\n"));
3368 assert!(result.contains("echo hi"));
3369 }
3370
3371 #[test]
3372 fn test_crontab_install_duplicate_error() {
3373 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3374 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3375 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3376 assert!(err.to_string().contains("already exists"));
3377 }
3378
3379 #[test]
3380 fn test_crontab_list_empty() {
3381 assert_eq!(crontab_list(""), "No pulpo schedules.");
3382 }
3383
3384 #[test]
3385 fn test_crontab_list_no_pulpo_entries() {
3386 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3387 }
3388
3389 #[test]
3390 fn test_crontab_list_with_entries() {
3391 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3392 let output = crontab_list(crontab);
3393 assert!(output.contains("NAME"));
3394 assert!(output.contains("CRON"));
3395 assert!(output.contains("PAUSED"));
3396 assert!(output.contains("nightly"));
3397 assert!(output.contains("0 3 * * *"));
3398 assert!(output.contains("no"));
3399 }
3400
3401 #[test]
3402 fn test_crontab_list_paused_entry() {
3403 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3404 let output = crontab_list(crontab);
3405 assert!(output.contains("paused-job"));
3406 assert!(output.contains("yes"));
3407 }
3408
3409 #[test]
3410 fn test_crontab_list_short_line() {
3411 let crontab = "badcron #pulpo:broken\n";
3413 let output = crontab_list(crontab);
3414 assert!(output.contains("broken"));
3415 assert!(output.contains('?'));
3416 }
3417
3418 #[test]
3419 fn test_crontab_remove_success() {
3420 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3421 let result = crontab_remove(crontab, "my-job").unwrap();
3422 assert!(result.contains("echo hi"));
3423 assert!(!result.contains("my-job"));
3424 }
3425
3426 #[test]
3427 fn test_crontab_remove_not_found() {
3428 let crontab = "0 * * * * echo hi\n";
3429 let err = crontab_remove(crontab, "ghost").unwrap_err();
3430 assert!(err.to_string().contains("not found"));
3431 }
3432
3433 #[test]
3434 fn test_crontab_pause_success() {
3435 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3436 let result = crontab_pause(crontab, "my-job").unwrap();
3437 assert!(result.starts_with('#'));
3438 assert!(result.contains("#pulpo:my-job"));
3439 }
3440
3441 #[test]
3442 fn test_crontab_pause_not_found() {
3443 let crontab = "0 * * * * echo hi\n";
3444 let err = crontab_pause(crontab, "ghost").unwrap_err();
3445 assert!(err.to_string().contains("not found or already paused"));
3446 }
3447
3448 #[test]
3449 fn test_crontab_pause_already_paused() {
3450 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3451 let err = crontab_pause(crontab, "my-job").unwrap_err();
3452 assert!(err.to_string().contains("already paused"));
3453 }
3454
3455 #[test]
3456 fn test_crontab_resume_success() {
3457 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3458 let result = crontab_resume(crontab, "my-job").unwrap();
3459 assert!(!result.starts_with('#'));
3460 assert!(result.contains("#pulpo:my-job"));
3461 }
3462
3463 #[test]
3464 fn test_crontab_resume_not_found() {
3465 let crontab = "0 * * * * echo hi\n";
3466 let err = crontab_resume(crontab, "ghost").unwrap_err();
3467 assert!(err.to_string().contains("not found or not paused"));
3468 }
3469
3470 #[test]
3471 fn test_crontab_resume_not_paused() {
3472 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3473 let err = crontab_resume(crontab, "my-job").unwrap_err();
3474 assert!(err.to_string().contains("not paused"));
3475 }
3476
3477 #[test]
3480 fn test_cli_parse_schedule_install() {
3481 let cli = Cli::try_parse_from([
3482 "pulpo",
3483 "schedule",
3484 "install",
3485 "nightly",
3486 "0 3 * * *",
3487 "--workdir",
3488 "/tmp/repo",
3489 "Review",
3490 "PRs",
3491 ])
3492 .unwrap();
3493 assert!(matches!(
3494 &cli.command,
3495 Commands::Schedule {
3496 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3497 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3498 && provider == "claude" && prompt == &["Review", "PRs"]
3499 ));
3500 }
3501
3502 #[test]
3503 fn test_cli_parse_schedule_list() {
3504 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3505 assert!(matches!(
3506 &cli.command,
3507 Commands::Schedule {
3508 action: ScheduleAction::List
3509 }
3510 ));
3511 }
3512
3513 #[test]
3514 fn test_cli_parse_schedule_remove() {
3515 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3516 assert!(matches!(
3517 &cli.command,
3518 Commands::Schedule {
3519 action: ScheduleAction::Remove { name }
3520 } if name == "nightly"
3521 ));
3522 }
3523
3524 #[test]
3525 fn test_cli_parse_schedule_pause() {
3526 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3527 assert!(matches!(
3528 &cli.command,
3529 Commands::Schedule {
3530 action: ScheduleAction::Pause { name }
3531 } if name == "nightly"
3532 ));
3533 }
3534
3535 #[test]
3536 fn test_cli_parse_schedule_resume() {
3537 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3538 assert!(matches!(
3539 &cli.command,
3540 Commands::Schedule {
3541 action: ScheduleAction::Resume { name }
3542 } if name == "nightly"
3543 ));
3544 }
3545
3546 #[test]
3547 fn test_cli_parse_schedule_alias() {
3548 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3549 assert!(matches!(
3550 &cli.command,
3551 Commands::Schedule {
3552 action: ScheduleAction::List
3553 }
3554 ));
3555 }
3556
3557 #[test]
3558 fn test_cli_parse_schedule_list_alias() {
3559 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3560 assert!(matches!(
3561 &cli.command,
3562 Commands::Schedule {
3563 action: ScheduleAction::List
3564 }
3565 ));
3566 }
3567
3568 #[test]
3569 fn test_cli_parse_schedule_remove_alias() {
3570 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3571 assert!(matches!(
3572 &cli.command,
3573 Commands::Schedule {
3574 action: ScheduleAction::Remove { name }
3575 } if name == "nightly"
3576 ));
3577 }
3578
3579 #[test]
3580 fn test_cli_parse_schedule_install_custom_provider() {
3581 let cli = Cli::try_parse_from([
3582 "pulpo",
3583 "schedule",
3584 "install",
3585 "daily",
3586 "0 9 * * *",
3587 "--workdir",
3588 "/tmp",
3589 "--provider",
3590 "codex",
3591 "Run tests",
3592 ])
3593 .unwrap();
3594 assert!(matches!(
3595 &cli.command,
3596 Commands::Schedule {
3597 action: ScheduleAction::Install { provider, .. }
3598 } if provider == "codex"
3599 ));
3600 }
3601
3602 #[tokio::test]
3603 async fn test_execute_schedule_via_execute() {
3604 let node = start_test_server().await;
3606 let cli = Cli {
3607 node,
3608 token: None,
3609 command: Commands::Schedule {
3610 action: ScheduleAction::List,
3611 },
3612 };
3613 let result = execute(&cli).await;
3614 assert!(result.is_ok() || result.is_err());
3616 }
3617
3618 #[test]
3619 fn test_schedule_action_debug() {
3620 let action = ScheduleAction::List;
3621 assert_eq!(format!("{action:?}"), "List");
3622 }
3623
3624 #[test]
3627 fn test_cli_parse_culture() {
3628 let cli = Cli::try_parse_from(["pulpo", "culture"]).unwrap();
3629 assert!(matches!(cli.command, Commands::Culture { .. }));
3630 }
3631
3632 #[test]
3633 fn test_cli_parse_culture_alias() {
3634 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3635 assert!(matches!(cli.command, Commands::Culture { .. }));
3636 }
3637
3638 #[test]
3639 fn test_cli_parse_culture_with_filters() {
3640 let cli = Cli::try_parse_from([
3641 "pulpo",
3642 "culture",
3643 "--kind",
3644 "failure",
3645 "--repo",
3646 "/tmp/repo",
3647 "--ink",
3648 "coder",
3649 "--limit",
3650 "5",
3651 ])
3652 .unwrap();
3653 match &cli.command {
3654 Commands::Culture {
3655 kind,
3656 repo,
3657 ink,
3658 limit,
3659 ..
3660 } => {
3661 assert_eq!(kind.as_deref(), Some("failure"));
3662 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3663 assert_eq!(ink.as_deref(), Some("coder"));
3664 assert_eq!(*limit, 5);
3665 }
3666 _ => panic!("expected Culture command"),
3667 }
3668 }
3669
3670 #[test]
3671 fn test_cli_parse_culture_context() {
3672 let cli =
3673 Cli::try_parse_from(["pulpo", "culture", "--context", "--repo", "/tmp/repo"]).unwrap();
3674 match &cli.command {
3675 Commands::Culture { context, repo, .. } => {
3676 assert!(*context);
3677 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3678 }
3679 _ => panic!("expected Culture command"),
3680 }
3681 }
3682
3683 #[test]
3684 fn test_format_culture_empty() {
3685 assert_eq!(format_culture(&[]), "No culture found.");
3686 }
3687
3688 #[test]
3689 fn test_format_culture_items() {
3690 use chrono::Utc;
3691 use pulpo_common::culture::{Culture, CultureKind};
3692 use uuid::Uuid;
3693
3694 let items = vec![
3695 Culture {
3696 id: Uuid::new_v4(),
3697 session_id: Uuid::new_v4(),
3698 kind: CultureKind::Summary,
3699 scope_repo: Some("/tmp/repo".into()),
3700 scope_ink: Some("coder".into()),
3701 title: "Fixed the auth bug".into(),
3702 body: "Details".into(),
3703 tags: vec!["claude".into(), "completed".into()],
3704 relevance: 0.7,
3705 created_at: Utc::now(),
3706 last_referenced_at: None,
3707 reference_count: 0,
3708 },
3709 Culture {
3710 id: Uuid::new_v4(),
3711 session_id: Uuid::new_v4(),
3712 kind: CultureKind::Failure,
3713 scope_repo: None,
3714 scope_ink: None,
3715 title: "OOM crash during build".into(),
3716 body: "Details".into(),
3717 tags: vec!["failure".into()],
3718 relevance: 0.9,
3719 created_at: Utc::now(),
3720 last_referenced_at: None,
3721 reference_count: 0,
3722 },
3723 ];
3724
3725 let output = format_culture(&items);
3726 assert!(output.contains("KIND"));
3727 assert!(output.contains("TITLE"));
3728 assert!(output.contains("summary"));
3729 assert!(output.contains("failure"));
3730 assert!(output.contains("Fixed the auth bug"));
3731 assert!(output.contains("repo"));
3732 assert!(output.contains("0.70"));
3733 }
3734
3735 #[test]
3736 fn test_format_culture_long_title_truncated() {
3737 use chrono::Utc;
3738 use pulpo_common::culture::{Culture, CultureKind};
3739 use uuid::Uuid;
3740
3741 let items = vec![Culture {
3742 id: Uuid::new_v4(),
3743 session_id: Uuid::new_v4(),
3744 kind: CultureKind::Summary,
3745 scope_repo: Some("/repo".into()),
3746 scope_ink: None,
3747 title: "A very long title that exceeds the maximum display width for culture items in the CLI".into(),
3748 body: "Body".into(),
3749 tags: vec![],
3750 relevance: 0.5,
3751 created_at: Utc::now(),
3752 last_referenced_at: None,
3753 reference_count: 0,
3754 }];
3755
3756 let output = format_culture(&items);
3757 assert!(output.contains('…'));
3758 }
3759
3760 #[test]
3761 fn test_cli_parse_culture_get() {
3762 let cli = Cli::try_parse_from(["pulpo", "culture", "--get", "abc-123"]).unwrap();
3763 match &cli.command {
3764 Commands::Culture { get, .. } => {
3765 assert_eq!(get.as_deref(), Some("abc-123"));
3766 }
3767 _ => panic!("expected Culture command"),
3768 }
3769 }
3770
3771 #[test]
3772 fn test_cli_parse_culture_delete() {
3773 let cli = Cli::try_parse_from(["pulpo", "culture", "--delete", "abc-123"]).unwrap();
3774 match &cli.command {
3775 Commands::Culture { delete, .. } => {
3776 assert_eq!(delete.as_deref(), Some("abc-123"));
3777 }
3778 _ => panic!("expected Culture command"),
3779 }
3780 }
3781
3782 #[test]
3783 fn test_cli_parse_culture_push() {
3784 let cli = Cli::try_parse_from(["pulpo", "culture", "--push"]).unwrap();
3785 match &cli.command {
3786 Commands::Culture { push, .. } => {
3787 assert!(*push);
3788 }
3789 _ => panic!("expected Culture command"),
3790 }
3791 }
3792
3793 #[test]
3794 fn test_format_culture_no_repo() {
3795 use chrono::Utc;
3796 use pulpo_common::culture::{Culture, CultureKind};
3797 use uuid::Uuid;
3798
3799 let items = vec![Culture {
3800 id: Uuid::new_v4(),
3801 session_id: Uuid::new_v4(),
3802 kind: CultureKind::Summary,
3803 scope_repo: None,
3804 scope_ink: None,
3805 title: "Global finding".into(),
3806 body: "Body".into(),
3807 tags: vec![],
3808 relevance: 0.5,
3809 created_at: Utc::now(),
3810 last_referenced_at: None,
3811 reference_count: 0,
3812 }];
3813
3814 let output = format_culture(&items);
3815 assert!(output.contains('-')); }
3817}