1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5 KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32 #[command(visible_alias = "a")]
34 Attach {
35 name: String,
37 },
38
39 #[command(visible_alias = "i")]
41 Input {
42 name: String,
44 text: Option<String>,
46 },
47
48 #[command(visible_alias = "s")]
50 Spawn {
51 #[arg(long)]
53 workdir: Option<String>,
54
55 #[arg(long)]
57 name: Option<String>,
58
59 #[arg(long)]
61 provider: Option<String>,
62
63 #[arg(long)]
65 auto: bool,
66
67 #[arg(long)]
69 unrestricted: bool,
70
71 #[arg(long)]
73 model: Option<String>,
74
75 #[arg(long)]
77 system_prompt: Option<String>,
78
79 #[arg(long, value_delimiter = ',')]
81 allowed_tools: Option<Vec<String>>,
82
83 #[arg(long)]
85 ink: Option<String>,
86
87 #[arg(long)]
89 max_turns: Option<u32>,
90
91 #[arg(long)]
93 max_budget: Option<f64>,
94
95 #[arg(long)]
97 output_format: Option<String>,
98
99 #[arg(long)]
101 worktree: bool,
102
103 prompt: Vec<String>,
105 },
106
107 #[command(visible_alias = "ls")]
109 List,
110
111 #[command(visible_alias = "l")]
113 Logs {
114 name: String,
116
117 #[arg(long, default_value = "100")]
119 lines: usize,
120
121 #[arg(short, long)]
123 follow: bool,
124 },
125
126 #[command(visible_alias = "k")]
128 Kill {
129 name: String,
131 },
132
133 #[command(visible_alias = "rm")]
135 Delete {
136 name: String,
138 },
139
140 #[command(visible_alias = "r")]
142 Resume {
143 name: String,
145 },
146
147 #[command(visible_alias = "n")]
149 Nodes,
150
151 #[command(visible_alias = "iv")]
153 Interventions {
154 name: String,
156 },
157
158 Ui,
160
161 #[command(visible_alias = "kn")]
163 Knowledge {
164 #[arg(long)]
166 session: Option<String>,
167
168 #[arg(long)]
170 kind: Option<String>,
171
172 #[arg(long)]
174 repo: Option<String>,
175
176 #[arg(long)]
178 ink: Option<String>,
179
180 #[arg(long, default_value = "20")]
182 limit: usize,
183
184 #[arg(long)]
186 context: bool,
187
188 #[arg(long)]
190 get: Option<String>,
191
192 #[arg(long)]
194 delete: Option<String>,
195
196 #[arg(long)]
198 push: bool,
199 },
200
201 #[command(visible_alias = "sched")]
203 Schedule {
204 #[command(subcommand)]
205 action: ScheduleAction,
206 },
207}
208
209#[derive(Subcommand, Debug)]
210pub enum ScheduleAction {
211 Install {
213 name: String,
215 cron: String,
217 #[arg(long)]
219 workdir: String,
220 #[arg(long, default_value = "claude")]
222 provider: String,
223 prompt: Vec<String>,
225 },
226 #[command(alias = "ls")]
228 List,
229 #[command(alias = "rm")]
231 Remove {
232 name: String,
234 },
235 Pause {
237 name: String,
239 },
240 Resume {
242 name: String,
244 },
245}
246
247pub fn base_url(node: &str) -> String {
249 format!("http://{node}")
250}
251
252#[derive(serde::Deserialize)]
254struct OutputResponse {
255 output: String,
256}
257
258fn format_sessions(sessions: &[Session]) -> String {
260 if sessions.is_empty() {
261 return "No sessions.".into();
262 }
263 let mut lines = vec![format!(
264 "{:<20} {:<12} {:<10} {:<14} {}",
265 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
266 )];
267 for s in sessions {
268 let prompt_display = if s.prompt.len() > 40 {
269 format!("{}...", &s.prompt[..37])
270 } else {
271 s.prompt.clone()
272 };
273 let status_display = if s.waiting_for_input {
274 "waiting".to_owned()
275 } else {
276 s.status.to_string()
277 };
278 lines.push(format!(
279 "{:<20} {:<12} {:<10} {:<14} {}",
280 s.name, status_display, s.provider, s.mode, prompt_display
281 ));
282 }
283 lines.join("\n")
284}
285
286fn format_nodes(resp: &PeersResponse) -> String {
288 let mut lines = vec![format!(
289 "{:<20} {:<25} {:<10} {}",
290 "NAME", "ADDRESS", "STATUS", "SESSIONS"
291 )];
292 lines.push(format!(
293 "{:<20} {:<25} {:<10} {}",
294 resp.local.name, "(local)", "online", "-"
295 ));
296 for p in &resp.peers {
297 let sessions = p
298 .session_count
299 .map_or_else(|| "-".into(), |c| c.to_string());
300 lines.push(format!(
301 "{:<20} {:<25} {:<10} {}",
302 p.name, p.address, p.status, sessions
303 ));
304 }
305 lines.join("\n")
306}
307
308fn format_interventions(events: &[InterventionEventResponse]) -> String {
310 if events.is_empty() {
311 return "No intervention events.".into();
312 }
313 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
314 for e in events {
315 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
316 }
317 lines.join("\n")
318}
319
320fn format_knowledge(items: &[Knowledge]) -> String {
322 if items.is_empty() {
323 return "No knowledge found.".into();
324 }
325 let mut lines = vec![format!(
326 "{:<10} {:<40} {:<10} {:<6} {}",
327 "KIND", "TITLE", "REPO", "REL", "TAGS"
328 )];
329 for k in items {
330 let title = if k.title.len() > 38 {
331 format!("{}…", &k.title[..37])
332 } else {
333 k.title.clone()
334 };
335 let repo = k
336 .scope_repo
337 .as_deref()
338 .and_then(|r| r.rsplit('/').next())
339 .unwrap_or("-");
340 let tags = k.tags.join(",");
341 lines.push(format!(
342 "{:<10} {:<40} {:<10} {:<6.2} {}",
343 k.kind, title, repo, k.relevance, tags
344 ));
345 }
346 lines.join("\n")
347}
348
349#[cfg_attr(coverage, allow(dead_code))]
351fn build_open_command(url: &str) -> std::process::Command {
352 #[cfg(target_os = "macos")]
353 {
354 let mut cmd = std::process::Command::new("open");
355 cmd.arg(url);
356 cmd
357 }
358 #[cfg(target_os = "linux")]
359 {
360 let mut cmd = std::process::Command::new("xdg-open");
361 cmd.arg(url);
362 cmd
363 }
364 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
365 {
366 let mut cmd = std::process::Command::new("xdg-open");
368 cmd.arg(url);
369 cmd
370 }
371}
372
373#[cfg(not(coverage))]
375fn open_browser(url: &str) -> Result<()> {
376 build_open_command(url).status()?;
377 Ok(())
378}
379
380#[cfg(coverage)]
382fn open_browser(_url: &str) -> Result<()> {
383 Ok(())
384}
385
386#[cfg_attr(coverage, allow(dead_code))]
389fn build_attach_command(backend_session_id: &str) -> std::process::Command {
390 let mut cmd = std::process::Command::new("tmux");
391 cmd.args(["attach-session", "-t", backend_session_id]);
392 cmd
393}
394
395#[cfg(not(any(test, coverage)))]
397fn attach_session(backend_session_id: &str) -> Result<()> {
398 let status = build_attach_command(backend_session_id).status()?;
399 if !status.success() {
400 anyhow::bail!("attach failed with {status}");
401 }
402 Ok(())
403}
404
405#[cfg(any(test, coverage))]
407#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
408fn attach_session(_backend_session_id: &str) -> Result<()> {
409 Ok(())
410}
411
412fn api_error(text: &str) -> anyhow::Error {
414 serde_json::from_str::<serde_json::Value>(text)
415 .ok()
416 .and_then(|v| v["error"].as_str().map(String::from))
417 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
418}
419
420async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
422 if resp.status().is_success() {
423 Ok(resp.text().await?)
424 } else {
425 let text = resp.text().await?;
426 Err(api_error(&text))
427 }
428}
429
430fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
432 if err.is_connect() {
433 anyhow::anyhow!(
434 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
435 )
436 } else {
437 anyhow::anyhow!("Network error connecting to {node}: {err}")
438 }
439}
440
441fn is_localhost(node: &str) -> bool {
443 let host = node.split(':').next().unwrap_or(node);
444 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
445}
446
447async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
449 let resp = client
450 .get(format!("{base}/api/v1/auth/token"))
451 .send()
452 .await
453 .ok()?;
454 let body: AuthTokenResponse = resp.json().await.ok()?;
455 if body.token.is_empty() {
456 None
457 } else {
458 Some(body.token)
459 }
460}
461
462async fn resolve_token(
464 client: &reqwest::Client,
465 base: &str,
466 node: &str,
467 explicit: Option<&str>,
468) -> Option<String> {
469 if let Some(t) = explicit {
470 return Some(t.to_owned());
471 }
472 if is_localhost(node) {
473 return discover_token(client, base).await;
474 }
475 None
476}
477
478fn authed_get(
480 client: &reqwest::Client,
481 url: String,
482 token: Option<&str>,
483) -> reqwest::RequestBuilder {
484 let req = client.get(url);
485 if let Some(t) = token {
486 req.bearer_auth(t)
487 } else {
488 req
489 }
490}
491
492fn authed_post(
494 client: &reqwest::Client,
495 url: String,
496 token: Option<&str>,
497) -> reqwest::RequestBuilder {
498 let req = client.post(url);
499 if let Some(t) = token {
500 req.bearer_auth(t)
501 } else {
502 req
503 }
504}
505
506fn authed_delete(
508 client: &reqwest::Client,
509 url: String,
510 token: Option<&str>,
511) -> reqwest::RequestBuilder {
512 let req = client.delete(url);
513 if let Some(t) = token {
514 req.bearer_auth(t)
515 } else {
516 req
517 }
518}
519
520async fn fetch_output(
522 client: &reqwest::Client,
523 base: &str,
524 name: &str,
525 lines: usize,
526 token: Option<&str>,
527) -> Result<String> {
528 let resp = authed_get(
529 client,
530 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
531 token,
532 )
533 .send()
534 .await?;
535 let text = ok_or_api_error(resp).await?;
536 let output: OutputResponse = serde_json::from_str(&text)?;
537 Ok(output.output)
538}
539
540async fn fetch_session_status(
542 client: &reqwest::Client,
543 base: &str,
544 name: &str,
545 token: Option<&str>,
546) -> Result<String> {
547 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
548 .send()
549 .await?;
550 let text = ok_or_api_error(resp).await?;
551 let session: Session = serde_json::from_str(&text)?;
552 Ok(session.status.to_string())
553}
554
555fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
562 if prev.is_empty() {
563 return new;
564 }
565
566 let prev_lines: Vec<&str> = prev.lines().collect();
567 let new_lines: Vec<&str> = new.lines().collect();
568
569 if new_lines.is_empty() {
570 return "";
571 }
572
573 let last_prev = prev_lines[prev_lines.len() - 1];
575
576 for i in (0..new_lines.len()).rev() {
578 if new_lines[i] == last_prev {
579 let overlap_len = prev_lines.len().min(i + 1);
581 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
582 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
583 if prev_tail == new_overlap {
584 if i + 1 < new_lines.len() {
585 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
587 return new.get(consumed.min(new.len())..).unwrap_or("");
588 }
589 return "";
590 }
591 }
592 }
593
594 new
596}
597
598async fn follow_logs(
600 client: &reqwest::Client,
601 base: &str,
602 name: &str,
603 lines: usize,
604 token: Option<&str>,
605 writer: &mut (dyn std::io::Write + Send),
606) -> Result<()> {
607 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
608 write!(writer, "{prev_output}")?;
609
610 loop {
611 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
612
613 let status = fetch_session_status(client, base, name, token).await?;
615 let is_terminal = status == "completed" || status == "dead" || status == "stale";
616
617 let new_output = fetch_output(client, base, name, lines, token).await?;
619
620 let diff = diff_output(&prev_output, &new_output);
621 if !diff.is_empty() {
622 write!(writer, "{diff}")?;
623 }
624 prev_output = new_output;
625
626 if is_terminal {
627 break;
628 }
629 }
630 Ok(())
631}
632
633#[cfg_attr(coverage, allow(dead_code))]
636const CRONTAB_TAG: &str = "#pulpo:";
637
638#[cfg(not(coverage))]
640fn read_crontab() -> Result<String> {
641 let output = std::process::Command::new("crontab").arg("-l").output()?;
642 if output.status.success() {
643 Ok(String::from_utf8_lossy(&output.stdout).to_string())
644 } else {
645 Ok(String::new())
646 }
647}
648
649#[cfg(not(coverage))]
651fn write_crontab(content: &str) -> Result<()> {
652 use std::io::Write;
653 let mut child = std::process::Command::new("crontab")
654 .arg("-")
655 .stdin(std::process::Stdio::piped())
656 .spawn()?;
657 child
658 .stdin
659 .as_mut()
660 .unwrap()
661 .write_all(content.as_bytes())?;
662 let status = child.wait()?;
663 if !status.success() {
664 anyhow::bail!("crontab write failed");
665 }
666 Ok(())
667}
668
669#[cfg_attr(coverage, allow(dead_code))]
671fn build_crontab_line(
672 name: &str,
673 cron: &str,
674 workdir: &str,
675 provider: &str,
676 prompt: &str,
677 node: &str,
678) -> String {
679 format!(
680 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
681 )
682}
683
684#[cfg_attr(coverage, allow(dead_code))]
686fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
687 let tag = format!("{CRONTAB_TAG}{name}");
688 if crontab.contains(&tag) {
689 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
690 }
691 let mut result = crontab.to_owned();
692 result.push_str(line);
693 Ok(result)
694}
695
696#[cfg_attr(coverage, allow(dead_code))]
698fn crontab_list(crontab: &str) -> String {
699 let entries: Vec<&str> = crontab
700 .lines()
701 .filter(|l| l.contains(CRONTAB_TAG))
702 .collect();
703 if entries.is_empty() {
704 return "No pulpo schedules.".into();
705 }
706 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
707 for entry in entries {
708 let paused = entry.starts_with('#');
709 let raw = entry.trim_start_matches('#').trim();
710 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
711 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
712 let cron_expr = if parts.len() >= 5 {
713 parts[..5].join(" ")
714 } else {
715 "?".into()
716 };
717 lines.push(format!(
718 "{:<20} {:<15} {}",
719 name,
720 cron_expr,
721 if paused { "yes" } else { "no" }
722 ));
723 }
724 lines.join("\n")
725}
726
727#[cfg_attr(coverage, allow(dead_code))]
729fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
730 use std::fmt::Write;
731 let tag = format!("{CRONTAB_TAG}{name}");
732 let filtered =
733 crontab
734 .lines()
735 .filter(|l| !l.contains(&tag))
736 .fold(String::new(), |mut acc, l| {
737 writeln!(acc, "{l}").unwrap();
738 acc
739 });
740 if filtered.len() == crontab.len() {
741 anyhow::bail!("schedule \"{name}\" not found");
742 }
743 Ok(filtered)
744}
745
746#[cfg_attr(coverage, allow(dead_code))]
748fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
749 use std::fmt::Write;
750 let tag = format!("{CRONTAB_TAG}{name}");
751 let mut found = false;
752 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
753 if l.contains(&tag) && !l.starts_with('#') {
754 found = true;
755 writeln!(acc, "#{l}").unwrap();
756 } else {
757 writeln!(acc, "{l}").unwrap();
758 }
759 acc
760 });
761 if !found {
762 anyhow::bail!("schedule \"{name}\" not found or already paused");
763 }
764 Ok(updated)
765}
766
767#[cfg_attr(coverage, allow(dead_code))]
769fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
770 use std::fmt::Write;
771 let tag = format!("{CRONTAB_TAG}{name}");
772 let mut found = false;
773 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
774 if l.contains(&tag) && l.starts_with('#') {
775 found = true;
776 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
777 } else {
778 writeln!(acc, "{l}").unwrap();
779 }
780 acc
781 });
782 if !found {
783 anyhow::bail!("schedule \"{name}\" not found or not paused");
784 }
785 Ok(updated)
786}
787
788#[cfg(not(coverage))]
790fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
791 match action {
792 ScheduleAction::Install {
793 name,
794 cron,
795 workdir,
796 provider,
797 prompt,
798 } => {
799 let crontab = read_crontab()?;
800 let joined_prompt = prompt.join(" ");
801 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
802 let updated = crontab_install(&crontab, name, &line)?;
803 write_crontab(&updated)?;
804 Ok(format!("Installed schedule \"{name}\""))
805 }
806 ScheduleAction::List => {
807 let crontab = read_crontab()?;
808 Ok(crontab_list(&crontab))
809 }
810 ScheduleAction::Remove { name } => {
811 let crontab = read_crontab()?;
812 let updated = crontab_remove(&crontab, name)?;
813 write_crontab(&updated)?;
814 Ok(format!("Removed schedule \"{name}\""))
815 }
816 ScheduleAction::Pause { name } => {
817 let crontab = read_crontab()?;
818 let updated = crontab_pause(&crontab, name)?;
819 write_crontab(&updated)?;
820 Ok(format!("Paused schedule \"{name}\""))
821 }
822 ScheduleAction::Resume { name } => {
823 let crontab = read_crontab()?;
824 let updated = crontab_resume(&crontab, name)?;
825 write_crontab(&updated)?;
826 Ok(format!("Resumed schedule \"{name}\""))
827 }
828 }
829}
830
831#[cfg(coverage)]
833fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
834 Ok(String::new())
835}
836
837#[allow(clippy::too_many_lines)]
839pub async fn execute(cli: &Cli) -> Result<String> {
840 let url = base_url(&cli.node);
841 let client = reqwest::Client::new();
842 let node = &cli.node;
843 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
844
845 match &cli.command {
846 Commands::Attach { name } => {
847 let resp = authed_get(
849 &client,
850 format!("{url}/api/v1/sessions/{name}"),
851 token.as_deref(),
852 )
853 .send()
854 .await
855 .map_err(|e| friendly_error(&e, node))?;
856 let text = ok_or_api_error(resp).await?;
857 let session: Session = serde_json::from_str(&text)?;
858 match session.status.to_string().as_str() {
859 "stale" => {
860 anyhow::bail!(
861 "Session \"{name}\" is stale (agent process died). Resume it first:\n pulpo resume {name}"
862 );
863 }
864 "completed" | "dead" => {
865 anyhow::bail!(
866 "Session \"{name}\" is {} — cannot attach to a finished session.",
867 session.status
868 );
869 }
870 _ => {}
871 }
872 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
873 attach_session(&backend_id)?;
874 Ok(format!("Detached from session {name}."))
875 }
876 Commands::Input { name, text } => {
877 let input_text = text.as_deref().unwrap_or("\n");
878 let body = serde_json::json!({ "text": input_text });
879 let resp = authed_post(
880 &client,
881 format!("{url}/api/v1/sessions/{name}/input"),
882 token.as_deref(),
883 )
884 .json(&body)
885 .send()
886 .await
887 .map_err(|e| friendly_error(&e, node))?;
888 ok_or_api_error(resp).await?;
889 Ok(format!("Sent input to session {name}."))
890 }
891 Commands::List => {
892 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
893 .send()
894 .await
895 .map_err(|e| friendly_error(&e, node))?;
896 let text = ok_or_api_error(resp).await?;
897 let sessions: Vec<Session> = serde_json::from_str(&text)?;
898 Ok(format_sessions(&sessions))
899 }
900 Commands::Nodes => {
901 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
902 .send()
903 .await
904 .map_err(|e| friendly_error(&e, node))?;
905 let text = ok_or_api_error(resp).await?;
906 let resp: PeersResponse = serde_json::from_str(&text)?;
907 Ok(format_nodes(&resp))
908 }
909 Commands::Spawn {
910 workdir,
911 name,
912 provider,
913 auto,
914 unrestricted,
915 model,
916 system_prompt,
917 allowed_tools,
918 ink,
919 max_turns,
920 max_budget,
921 output_format,
922 worktree,
923 prompt,
924 } => {
925 let prompt_text = prompt.join(" ");
926 let mode = if *auto { "autonomous" } else { "interactive" };
927 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
929 std::env::current_dir()
930 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
931 });
932 let mut body = serde_json::json!({
933 "workdir": resolved_workdir,
934 "mode": mode,
935 });
936 if !prompt_text.is_empty() {
938 body["prompt"] = serde_json::json!(prompt_text);
939 }
940 if let Some(p) = provider {
942 body["provider"] = serde_json::json!(p);
943 }
944 if *unrestricted {
945 body["unrestricted"] = serde_json::json!(true);
946 }
947 if let Some(n) = name {
948 body["name"] = serde_json::json!(n);
949 }
950 if let Some(m) = model {
951 body["model"] = serde_json::json!(m);
952 }
953 if let Some(sp) = system_prompt {
954 body["system_prompt"] = serde_json::json!(sp);
955 }
956 if let Some(tools) = allowed_tools {
957 body["allowed_tools"] = serde_json::json!(tools);
958 }
959 if let Some(p) = ink {
960 body["ink"] = serde_json::json!(p);
961 }
962 if let Some(mt) = max_turns {
963 body["max_turns"] = serde_json::json!(mt);
964 }
965 if let Some(mb) = max_budget {
966 body["max_budget_usd"] = serde_json::json!(mb);
967 }
968 if let Some(of) = output_format {
969 body["output_format"] = serde_json::json!(of);
970 }
971 if *worktree {
972 body["worktree"] = serde_json::json!(true);
973 }
974 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
975 .json(&body)
976 .send()
977 .await
978 .map_err(|e| friendly_error(&e, node))?;
979 let text = ok_or_api_error(resp).await?;
980 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
981 let mut msg = format!(
982 "Created session \"{}\" ({})",
983 resp.session.name, resp.session.id
984 );
985 for w in &resp.warnings {
986 use std::fmt::Write;
987 let _ = write!(msg, "\n Warning: {w}");
988 }
989 Ok(msg)
990 }
991 Commands::Kill { name } => {
992 let resp = authed_post(
993 &client,
994 format!("{url}/api/v1/sessions/{name}/kill"),
995 token.as_deref(),
996 )
997 .send()
998 .await
999 .map_err(|e| friendly_error(&e, node))?;
1000 ok_or_api_error(resp).await?;
1001 Ok(format!("Session {name} killed."))
1002 }
1003 Commands::Delete { name } => {
1004 let resp = authed_delete(
1005 &client,
1006 format!("{url}/api/v1/sessions/{name}"),
1007 token.as_deref(),
1008 )
1009 .send()
1010 .await
1011 .map_err(|e| friendly_error(&e, node))?;
1012 ok_or_api_error(resp).await?;
1013 Ok(format!("Session {name} deleted."))
1014 }
1015 Commands::Logs {
1016 name,
1017 lines,
1018 follow,
1019 } => {
1020 if *follow {
1021 let mut stdout = std::io::stdout();
1022 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1023 .await
1024 .map_err(|e| {
1025 match e.downcast::<reqwest::Error>() {
1027 Ok(re) => friendly_error(&re, node),
1028 Err(other) => other,
1029 }
1030 })?;
1031 Ok(String::new())
1032 } else {
1033 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1034 .await
1035 .map_err(|e| match e.downcast::<reqwest::Error>() {
1036 Ok(re) => friendly_error(&re, node),
1037 Err(other) => other,
1038 })?;
1039 Ok(output)
1040 }
1041 }
1042 Commands::Interventions { name } => {
1043 let resp = authed_get(
1044 &client,
1045 format!("{url}/api/v1/sessions/{name}/interventions"),
1046 token.as_deref(),
1047 )
1048 .send()
1049 .await
1050 .map_err(|e| friendly_error(&e, node))?;
1051 let text = ok_or_api_error(resp).await?;
1052 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1053 Ok(format_interventions(&events))
1054 }
1055 Commands::Ui => {
1056 let dashboard = base_url(&cli.node);
1057 open_browser(&dashboard)?;
1058 Ok(format!("Opening {dashboard}"))
1059 }
1060 Commands::Resume { name } => {
1061 let resp = authed_post(
1062 &client,
1063 format!("{url}/api/v1/sessions/{name}/resume"),
1064 token.as_deref(),
1065 )
1066 .send()
1067 .await
1068 .map_err(|e| friendly_error(&e, node))?;
1069 let text = ok_or_api_error(resp).await?;
1070 let session: Session = serde_json::from_str(&text)?;
1071 Ok(format!("Resumed session \"{}\"", session.name))
1072 }
1073 Commands::Knowledge {
1074 session,
1075 kind,
1076 repo,
1077 ink,
1078 limit,
1079 context,
1080 get,
1081 delete,
1082 push,
1083 } => {
1084 if let Some(id) = get {
1086 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1087 let resp = authed_get(&client, endpoint, token.as_deref())
1088 .send()
1089 .await
1090 .map_err(|e| friendly_error(&e, node))?;
1091 let text = ok_or_api_error(resp).await?;
1092 let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1093 return Ok(format_knowledge(&[resp.knowledge]));
1094 }
1095
1096 if let Some(id) = delete {
1098 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1099 let resp = authed_delete(&client, endpoint, token.as_deref())
1100 .send()
1101 .await
1102 .map_err(|e| friendly_error(&e, node))?;
1103 let text = ok_or_api_error(resp).await?;
1104 let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1105 return Ok(if resp.deleted {
1106 format!("Deleted knowledge item {id}")
1107 } else {
1108 format!("Knowledge item {id} not found")
1109 });
1110 }
1111
1112 if *push {
1114 let endpoint = format!("{url}/api/v1/knowledge/push");
1115 let resp = authed_post(&client, endpoint, token.as_deref())
1116 .send()
1117 .await
1118 .map_err(|e| friendly_error(&e, node))?;
1119 let text = ok_or_api_error(resp).await?;
1120 let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1121 return Ok(resp.message);
1122 }
1123
1124 let mut params = vec![format!("limit={limit}")];
1126 let endpoint = if *context {
1127 if let Some(r) = repo {
1128 params.push(format!("workdir={r}"));
1129 }
1130 if let Some(i) = ink {
1131 params.push(format!("ink={i}"));
1132 }
1133 format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1134 } else {
1135 if let Some(s) = session {
1136 params.push(format!("session_id={s}"));
1137 }
1138 if let Some(k) = kind {
1139 params.push(format!("kind={k}"));
1140 }
1141 if let Some(r) = repo {
1142 params.push(format!("repo={r}"));
1143 }
1144 if let Some(i) = ink {
1145 params.push(format!("ink={i}"));
1146 }
1147 format!("{url}/api/v1/knowledge?{}", params.join("&"))
1148 };
1149 let resp = authed_get(&client, endpoint, token.as_deref())
1150 .send()
1151 .await
1152 .map_err(|e| friendly_error(&e, node))?;
1153 let text = ok_or_api_error(resp).await?;
1154 let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1155 Ok(format_knowledge(&resp.knowledge))
1156 }
1157 Commands::Schedule { action } => execute_schedule(action, node),
1158 }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163 use super::*;
1164
1165 #[test]
1166 fn test_base_url() {
1167 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1168 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1169 }
1170
1171 #[test]
1172 fn test_cli_parse_list() {
1173 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1174 assert_eq!(cli.node, "localhost:7433");
1175 assert!(matches!(cli.command, Commands::List));
1176 }
1177
1178 #[test]
1179 fn test_cli_parse_nodes() {
1180 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1181 assert!(matches!(cli.command, Commands::Nodes));
1182 }
1183
1184 #[test]
1185 fn test_cli_parse_ui() {
1186 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1187 assert!(matches!(cli.command, Commands::Ui));
1188 }
1189
1190 #[test]
1191 fn test_cli_parse_ui_custom_node() {
1192 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1193 assert!(matches!(cli.command, Commands::Ui));
1194 assert_eq!(cli.node, "mac-mini:7433");
1195 }
1196
1197 #[test]
1198 fn test_build_open_command() {
1199 let cmd = build_open_command("http://localhost:7433");
1200 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1201 assert_eq!(args, vec!["http://localhost:7433"]);
1202 #[cfg(target_os = "macos")]
1203 assert_eq!(cmd.get_program(), "open");
1204 #[cfg(target_os = "linux")]
1205 assert_eq!(cmd.get_program(), "xdg-open");
1206 }
1207
1208 #[test]
1209 fn test_cli_parse_spawn() {
1210 let cli = Cli::try_parse_from([
1211 "pulpo",
1212 "spawn",
1213 "--workdir",
1214 "/tmp/repo",
1215 "Fix",
1216 "the",
1217 "bug",
1218 ])
1219 .unwrap();
1220 assert!(matches!(
1221 &cli.command,
1222 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1223 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1224 && !unrestricted && prompt == &["Fix", "the", "bug"]
1225 ));
1226 }
1227
1228 #[test]
1229 fn test_cli_parse_spawn_with_provider() {
1230 let cli = Cli::try_parse_from([
1231 "pulpo",
1232 "spawn",
1233 "--workdir",
1234 "/tmp",
1235 "--provider",
1236 "codex",
1237 "Do it",
1238 ])
1239 .unwrap();
1240 assert!(matches!(
1241 &cli.command,
1242 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1243 ));
1244 }
1245
1246 #[test]
1247 fn test_cli_parse_spawn_auto() {
1248 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1249 .unwrap();
1250 assert!(matches!(
1251 &cli.command,
1252 Commands::Spawn { auto, .. } if *auto
1253 ));
1254 }
1255
1256 #[test]
1257 fn test_cli_parse_spawn_unrestricted() {
1258 let cli = Cli::try_parse_from([
1259 "pulpo",
1260 "spawn",
1261 "--workdir",
1262 "/tmp",
1263 "--unrestricted",
1264 "Do it",
1265 ])
1266 .unwrap();
1267 assert!(matches!(
1268 &cli.command,
1269 Commands::Spawn { unrestricted, .. } if *unrestricted
1270 ));
1271 }
1272
1273 #[test]
1274 fn test_cli_parse_spawn_unrestricted_default() {
1275 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1276 assert!(matches!(
1277 &cli.command,
1278 Commands::Spawn { unrestricted, .. } if !unrestricted
1279 ));
1280 }
1281
1282 #[test]
1283 fn test_cli_parse_spawn_with_name() {
1284 let cli = Cli::try_parse_from([
1285 "pulpo",
1286 "spawn",
1287 "--workdir",
1288 "/tmp/repo",
1289 "--name",
1290 "my-task",
1291 "Fix it",
1292 ])
1293 .unwrap();
1294 assert!(matches!(
1295 &cli.command,
1296 Commands::Spawn { workdir, name, .. }
1297 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1298 ));
1299 }
1300
1301 #[test]
1302 fn test_cli_parse_spawn_without_name() {
1303 let cli =
1304 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1305 assert!(matches!(
1306 &cli.command,
1307 Commands::Spawn { name, .. } if name.is_none()
1308 ));
1309 }
1310
1311 #[test]
1312 fn test_cli_parse_logs() {
1313 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1314 assert!(matches!(
1315 &cli.command,
1316 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1317 ));
1318 }
1319
1320 #[test]
1321 fn test_cli_parse_logs_with_lines() {
1322 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1323 assert!(matches!(
1324 &cli.command,
1325 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1326 ));
1327 }
1328
1329 #[test]
1330 fn test_cli_parse_logs_follow() {
1331 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1332 assert!(matches!(
1333 &cli.command,
1334 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1335 ));
1336 }
1337
1338 #[test]
1339 fn test_cli_parse_logs_follow_short() {
1340 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1341 assert!(matches!(
1342 &cli.command,
1343 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1344 ));
1345 }
1346
1347 #[test]
1348 fn test_cli_parse_kill() {
1349 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1350 assert!(matches!(
1351 &cli.command,
1352 Commands::Kill { name } if name == "my-session"
1353 ));
1354 }
1355
1356 #[test]
1357 fn test_cli_parse_delete() {
1358 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1359 assert!(matches!(
1360 &cli.command,
1361 Commands::Delete { name } if name == "my-session"
1362 ));
1363 }
1364
1365 #[test]
1366 fn test_cli_parse_resume() {
1367 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1368 assert!(matches!(
1369 &cli.command,
1370 Commands::Resume { name } if name == "my-session"
1371 ));
1372 }
1373
1374 #[test]
1375 fn test_cli_parse_input() {
1376 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1377 assert!(matches!(
1378 &cli.command,
1379 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1380 ));
1381 }
1382
1383 #[test]
1384 fn test_cli_parse_input_no_text() {
1385 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1386 assert!(matches!(
1387 &cli.command,
1388 Commands::Input { name, text } if name == "my-session" && text.is_none()
1389 ));
1390 }
1391
1392 #[test]
1393 fn test_cli_parse_input_alias() {
1394 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1395 assert!(matches!(
1396 &cli.command,
1397 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1398 ));
1399 }
1400
1401 #[test]
1402 fn test_cli_parse_custom_node() {
1403 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1404 assert_eq!(cli.node, "win-pc:8080");
1405 }
1406
1407 #[test]
1408 fn test_cli_version() {
1409 let result = Cli::try_parse_from(["pulpo", "--version"]);
1410 let err = result.unwrap_err();
1412 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1413 }
1414
1415 #[test]
1416 fn test_cli_parse_no_subcommand_fails() {
1417 let result = Cli::try_parse_from(["pulpo"]);
1418 assert!(result.is_err());
1419 }
1420
1421 #[test]
1422 fn test_cli_debug() {
1423 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1424 let debug = format!("{cli:?}");
1425 assert!(debug.contains("List"));
1426 }
1427
1428 #[test]
1429 fn test_commands_debug() {
1430 let cmd = Commands::List;
1431 assert_eq!(format!("{cmd:?}"), "List");
1432 }
1433
1434 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1436
1437 fn test_create_response_json() -> String {
1439 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1440 }
1441
1442 async fn start_test_server() -> String {
1444 use axum::http::StatusCode;
1445 use axum::{
1446 Json, Router,
1447 routing::{get, post},
1448 };
1449
1450 let create_json = test_create_response_json();
1451
1452 let app = Router::new()
1453 .route(
1454 "/api/v1/sessions",
1455 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1456 (StatusCode::CREATED, create_json.clone())
1457 }),
1458 )
1459 .route(
1460 "/api/v1/sessions/{id}",
1461 get(|| async { TEST_SESSION_JSON.to_owned() })
1462 .delete(|| async { StatusCode::NO_CONTENT }),
1463 )
1464 .route(
1465 "/api/v1/sessions/{id}/kill",
1466 post(|| async { StatusCode::NO_CONTENT }),
1467 )
1468 .route(
1469 "/api/v1/sessions/{id}/output",
1470 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1471 )
1472 .route(
1473 "/api/v1/peers",
1474 get(|| async {
1475 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1476 }),
1477 )
1478 .route(
1479 "/api/v1/sessions/{id}/resume",
1480 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1481 )
1482 .route(
1483 "/api/v1/sessions/{id}/interventions",
1484 get(|| async { "[]".to_owned() }),
1485 )
1486 .route(
1487 "/api/v1/sessions/{id}/input",
1488 post(|| async { StatusCode::NO_CONTENT }),
1489 );
1490
1491 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1492 let addr = listener.local_addr().unwrap();
1493 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1494 format!("127.0.0.1:{}", addr.port())
1495 }
1496
1497 #[tokio::test]
1498 async fn test_execute_list_success() {
1499 let node = start_test_server().await;
1500 let cli = Cli {
1501 node,
1502 token: None,
1503 command: Commands::List,
1504 };
1505 let result = execute(&cli).await.unwrap();
1506 assert_eq!(result, "No sessions.");
1507 }
1508
1509 #[tokio::test]
1510 async fn test_execute_nodes_success() {
1511 let node = start_test_server().await;
1512 let cli = Cli {
1513 node,
1514 token: None,
1515 command: Commands::Nodes,
1516 };
1517 let result = execute(&cli).await.unwrap();
1518 assert!(result.contains("test"));
1519 assert!(result.contains("(local)"));
1520 assert!(result.contains("NAME"));
1521 }
1522
1523 #[tokio::test]
1524 async fn test_execute_spawn_success() {
1525 let node = start_test_server().await;
1526 let cli = Cli {
1527 node,
1528 token: None,
1529 command: Commands::Spawn {
1530 workdir: Some("/tmp/repo".into()),
1531 name: None,
1532 provider: Some("claude".into()),
1533 auto: false,
1534 unrestricted: false,
1535 model: None,
1536 system_prompt: None,
1537 allowed_tools: None,
1538 ink: None,
1539 max_turns: None,
1540 max_budget: None,
1541 output_format: None,
1542 worktree: false,
1543 prompt: vec!["Fix".into(), "bug".into()],
1544 },
1545 };
1546 let result = execute(&cli).await.unwrap();
1547 assert!(result.contains("Created session"));
1548 assert!(result.contains("repo"));
1549 }
1550
1551 #[tokio::test]
1552 async fn test_execute_spawn_with_all_new_flags() {
1553 let node = start_test_server().await;
1554 let cli = Cli {
1555 node,
1556 token: None,
1557 command: Commands::Spawn {
1558 workdir: Some("/tmp/repo".into()),
1559 name: None,
1560 provider: Some("claude".into()),
1561 auto: false,
1562 unrestricted: false,
1563 model: Some("opus".into()),
1564 system_prompt: Some("Be helpful".into()),
1565 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1566 ink: Some("coder".into()),
1567 max_turns: Some(5),
1568 max_budget: Some(2.5),
1569 output_format: Some("json".into()),
1570 worktree: false,
1571 prompt: vec!["Fix".into(), "bug".into()],
1572 },
1573 };
1574 let result = execute(&cli).await.unwrap();
1575 assert!(result.contains("Created session"));
1576 }
1577
1578 #[tokio::test]
1579 async fn test_execute_spawn_auto_mode() {
1580 let node = start_test_server().await;
1581 let cli = Cli {
1582 node,
1583 token: None,
1584 command: Commands::Spawn {
1585 workdir: Some("/tmp/repo".into()),
1586 name: None,
1587 provider: Some("claude".into()),
1588 auto: true,
1589 unrestricted: false,
1590 model: None,
1591 system_prompt: None,
1592 allowed_tools: None,
1593 ink: None,
1594 max_turns: None,
1595 max_budget: None,
1596 output_format: None,
1597 worktree: false,
1598 prompt: vec!["Do it".into()],
1599 },
1600 };
1601 let result = execute(&cli).await.unwrap();
1602 assert!(result.contains("Created session"));
1603 }
1604
1605 #[tokio::test]
1606 async fn test_execute_spawn_with_name() {
1607 let node = start_test_server().await;
1608 let cli = Cli {
1609 node,
1610 token: None,
1611 command: Commands::Spawn {
1612 workdir: Some("/tmp/repo".into()),
1613 name: Some("my-task".into()),
1614 provider: Some("claude".into()),
1615 auto: false,
1616 unrestricted: false,
1617 model: None,
1618 system_prompt: None,
1619 allowed_tools: None,
1620 ink: None,
1621 max_turns: None,
1622 max_budget: None,
1623 output_format: None,
1624 worktree: false,
1625 prompt: vec!["Fix".into(), "bug".into()],
1626 },
1627 };
1628 let result = execute(&cli).await.unwrap();
1629 assert!(result.contains("Created session"));
1630 }
1631
1632 #[tokio::test]
1633 async fn test_execute_kill_success() {
1634 let node = start_test_server().await;
1635 let cli = Cli {
1636 node,
1637 token: None,
1638 command: Commands::Kill {
1639 name: "test-session".into(),
1640 },
1641 };
1642 let result = execute(&cli).await.unwrap();
1643 assert!(result.contains("killed"));
1644 }
1645
1646 #[tokio::test]
1647 async fn test_execute_delete_success() {
1648 let node = start_test_server().await;
1649 let cli = Cli {
1650 node,
1651 token: None,
1652 command: Commands::Delete {
1653 name: "test-session".into(),
1654 },
1655 };
1656 let result = execute(&cli).await.unwrap();
1657 assert!(result.contains("deleted"));
1658 }
1659
1660 #[tokio::test]
1661 async fn test_execute_logs_success() {
1662 let node = start_test_server().await;
1663 let cli = Cli {
1664 node,
1665 token: None,
1666 command: Commands::Logs {
1667 name: "test-session".into(),
1668 lines: 50,
1669 follow: false,
1670 },
1671 };
1672 let result = execute(&cli).await.unwrap();
1673 assert!(result.contains("test output"));
1674 }
1675
1676 #[tokio::test]
1677 async fn test_execute_list_connection_refused() {
1678 let cli = Cli {
1679 node: "localhost:1".into(),
1680 token: None,
1681 command: Commands::List,
1682 };
1683 let result = execute(&cli).await;
1684 let err = result.unwrap_err().to_string();
1685 assert!(
1686 err.contains("Could not connect to pulpod"),
1687 "Expected friendly error, got: {err}"
1688 );
1689 assert!(err.contains("localhost:1"));
1690 }
1691
1692 #[tokio::test]
1693 async fn test_execute_nodes_connection_refused() {
1694 let cli = Cli {
1695 node: "localhost:1".into(),
1696 token: None,
1697 command: Commands::Nodes,
1698 };
1699 let result = execute(&cli).await;
1700 let err = result.unwrap_err().to_string();
1701 assert!(err.contains("Could not connect to pulpod"));
1702 }
1703
1704 #[tokio::test]
1705 async fn test_execute_kill_error_response() {
1706 use axum::{Router, http::StatusCode, routing::post};
1707
1708 let app = Router::new().route(
1709 "/api/v1/sessions/{id}/kill",
1710 post(|| async {
1711 (
1712 StatusCode::NOT_FOUND,
1713 "{\"error\":\"session not found: test-session\"}",
1714 )
1715 }),
1716 );
1717 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1718 let addr = listener.local_addr().unwrap();
1719 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1720 let node = format!("127.0.0.1:{}", addr.port());
1721
1722 let cli = Cli {
1723 node,
1724 token: None,
1725 command: Commands::Kill {
1726 name: "test-session".into(),
1727 },
1728 };
1729 let err = execute(&cli).await.unwrap_err();
1730 assert_eq!(err.to_string(), "session not found: test-session");
1731 }
1732
1733 #[tokio::test]
1734 async fn test_execute_delete_error_response() {
1735 use axum::{Router, http::StatusCode, routing::delete};
1736
1737 let app = Router::new().route(
1738 "/api/v1/sessions/{id}",
1739 delete(|| async {
1740 (
1741 StatusCode::CONFLICT,
1742 "{\"error\":\"cannot delete session in 'running' state\"}",
1743 )
1744 }),
1745 );
1746 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1747 let addr = listener.local_addr().unwrap();
1748 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1749 let node = format!("127.0.0.1:{}", addr.port());
1750
1751 let cli = Cli {
1752 node,
1753 token: None,
1754 command: Commands::Delete {
1755 name: "test-session".into(),
1756 },
1757 };
1758 let err = execute(&cli).await.unwrap_err();
1759 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1760 }
1761
1762 #[tokio::test]
1763 async fn test_execute_logs_error_response() {
1764 use axum::{Router, http::StatusCode, routing::get};
1765
1766 let app = Router::new().route(
1767 "/api/v1/sessions/{id}/output",
1768 get(|| async {
1769 (
1770 StatusCode::NOT_FOUND,
1771 "{\"error\":\"session not found: ghost\"}",
1772 )
1773 }),
1774 );
1775 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1776 let addr = listener.local_addr().unwrap();
1777 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1778 let node = format!("127.0.0.1:{}", addr.port());
1779
1780 let cli = Cli {
1781 node,
1782 token: None,
1783 command: Commands::Logs {
1784 name: "ghost".into(),
1785 lines: 50,
1786 follow: false,
1787 },
1788 };
1789 let err = execute(&cli).await.unwrap_err();
1790 assert_eq!(err.to_string(), "session not found: ghost");
1791 }
1792
1793 #[tokio::test]
1794 async fn test_execute_resume_error_response() {
1795 use axum::{Router, http::StatusCode, routing::post};
1796
1797 let app = Router::new().route(
1798 "/api/v1/sessions/{id}/resume",
1799 post(|| async {
1800 (
1801 StatusCode::BAD_REQUEST,
1802 "{\"error\":\"session is not stale (status: running)\"}",
1803 )
1804 }),
1805 );
1806 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1807 let addr = listener.local_addr().unwrap();
1808 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1809 let node = format!("127.0.0.1:{}", addr.port());
1810
1811 let cli = Cli {
1812 node,
1813 token: None,
1814 command: Commands::Resume {
1815 name: "test-session".into(),
1816 },
1817 };
1818 let err = execute(&cli).await.unwrap_err();
1819 assert_eq!(err.to_string(), "session is not stale (status: running)");
1820 }
1821
1822 #[tokio::test]
1823 async fn test_execute_spawn_error_response() {
1824 use axum::{Router, http::StatusCode, routing::post};
1825
1826 let app = Router::new().route(
1827 "/api/v1/sessions",
1828 post(|| async {
1829 (
1830 StatusCode::INTERNAL_SERVER_ERROR,
1831 "{\"error\":\"failed to spawn session\"}",
1832 )
1833 }),
1834 );
1835 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1836 let addr = listener.local_addr().unwrap();
1837 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1838 let node = format!("127.0.0.1:{}", addr.port());
1839
1840 let cli = Cli {
1841 node,
1842 token: None,
1843 command: Commands::Spawn {
1844 workdir: Some("/tmp/repo".into()),
1845 name: None,
1846 provider: Some("claude".into()),
1847 auto: false,
1848 unrestricted: false,
1849 model: None,
1850 system_prompt: None,
1851 allowed_tools: None,
1852 ink: None,
1853 max_turns: None,
1854 max_budget: None,
1855 output_format: None,
1856 worktree: false,
1857 prompt: vec!["test".into()],
1858 },
1859 };
1860 let err = execute(&cli).await.unwrap_err();
1861 assert_eq!(err.to_string(), "failed to spawn session");
1862 }
1863
1864 #[tokio::test]
1865 async fn test_execute_interventions_error_response() {
1866 use axum::{Router, http::StatusCode, routing::get};
1867
1868 let app = Router::new().route(
1869 "/api/v1/sessions/{id}/interventions",
1870 get(|| async {
1871 (
1872 StatusCode::NOT_FOUND,
1873 "{\"error\":\"session not found: ghost\"}",
1874 )
1875 }),
1876 );
1877 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1878 let addr = listener.local_addr().unwrap();
1879 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1880 let node = format!("127.0.0.1:{}", addr.port());
1881
1882 let cli = Cli {
1883 node,
1884 token: None,
1885 command: Commands::Interventions {
1886 name: "ghost".into(),
1887 },
1888 };
1889 let err = execute(&cli).await.unwrap_err();
1890 assert_eq!(err.to_string(), "session not found: ghost");
1891 }
1892
1893 #[tokio::test]
1894 async fn test_execute_resume_success() {
1895 let node = start_test_server().await;
1896 let cli = Cli {
1897 node,
1898 token: None,
1899 command: Commands::Resume {
1900 name: "test-session".into(),
1901 },
1902 };
1903 let result = execute(&cli).await.unwrap();
1904 assert!(result.contains("Resumed session"));
1905 assert!(result.contains("repo"));
1906 }
1907
1908 #[tokio::test]
1909 async fn test_execute_input_success() {
1910 let node = start_test_server().await;
1911 let cli = Cli {
1912 node,
1913 token: None,
1914 command: Commands::Input {
1915 name: "test-session".into(),
1916 text: Some("yes".into()),
1917 },
1918 };
1919 let result = execute(&cli).await.unwrap();
1920 assert!(result.contains("Sent input to session test-session"));
1921 }
1922
1923 #[tokio::test]
1924 async fn test_execute_input_no_text() {
1925 let node = start_test_server().await;
1926 let cli = Cli {
1927 node,
1928 token: None,
1929 command: Commands::Input {
1930 name: "test-session".into(),
1931 text: None,
1932 },
1933 };
1934 let result = execute(&cli).await.unwrap();
1935 assert!(result.contains("Sent input to session test-session"));
1936 }
1937
1938 #[tokio::test]
1939 async fn test_execute_input_connection_refused() {
1940 let cli = Cli {
1941 node: "localhost:1".into(),
1942 token: None,
1943 command: Commands::Input {
1944 name: "test".into(),
1945 text: Some("y".into()),
1946 },
1947 };
1948 let result = execute(&cli).await;
1949 let err = result.unwrap_err().to_string();
1950 assert!(err.contains("Could not connect to pulpod"));
1951 }
1952
1953 #[tokio::test]
1954 async fn test_execute_input_error_response() {
1955 use axum::{Router, http::StatusCode, routing::post};
1956
1957 let app = Router::new().route(
1958 "/api/v1/sessions/{id}/input",
1959 post(|| async {
1960 (
1961 StatusCode::NOT_FOUND,
1962 "{\"error\":\"session not found: ghost\"}",
1963 )
1964 }),
1965 );
1966 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1967 let addr = listener.local_addr().unwrap();
1968 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1969 let node = format!("127.0.0.1:{}", addr.port());
1970
1971 let cli = Cli {
1972 node,
1973 token: None,
1974 command: Commands::Input {
1975 name: "ghost".into(),
1976 text: Some("y".into()),
1977 },
1978 };
1979 let err = execute(&cli).await.unwrap_err();
1980 assert_eq!(err.to_string(), "session not found: ghost");
1981 }
1982
1983 #[tokio::test]
1984 async fn test_execute_ui() {
1985 let cli = Cli {
1986 node: "localhost:7433".into(),
1987 token: None,
1988 command: Commands::Ui,
1989 };
1990 let result = execute(&cli).await.unwrap();
1991 assert!(result.contains("Opening"));
1992 assert!(result.contains("http://localhost:7433"));
1993 }
1994
1995 #[tokio::test]
1996 async fn test_execute_ui_custom_node() {
1997 let cli = Cli {
1998 node: "mac-mini:7433".into(),
1999 token: None,
2000 command: Commands::Ui,
2001 };
2002 let result = execute(&cli).await.unwrap();
2003 assert!(result.contains("http://mac-mini:7433"));
2004 }
2005
2006 #[test]
2007 fn test_format_sessions_empty() {
2008 assert_eq!(format_sessions(&[]), "No sessions.");
2009 }
2010
2011 #[test]
2012 fn test_format_sessions_with_data() {
2013 use chrono::Utc;
2014 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2015 use uuid::Uuid;
2016
2017 let sessions = vec![Session {
2018 id: Uuid::nil(),
2019 name: "my-api".into(),
2020 workdir: "/tmp/repo".into(),
2021 provider: Provider::Claude,
2022 prompt: "Fix the bug".into(),
2023 status: SessionStatus::Running,
2024 mode: SessionMode::Interactive,
2025 conversation_id: None,
2026 exit_code: None,
2027 backend_session_id: None,
2028 output_snapshot: None,
2029 guard_config: None,
2030 model: None,
2031 allowed_tools: None,
2032 system_prompt: None,
2033 metadata: None,
2034 ink: None,
2035 max_turns: None,
2036 max_budget_usd: None,
2037 output_format: None,
2038 intervention_reason: None,
2039 intervention_at: None,
2040 last_output_at: None,
2041 idle_since: None,
2042 waiting_for_input: false,
2043 created_at: Utc::now(),
2044 updated_at: Utc::now(),
2045 }];
2046 let output = format_sessions(&sessions);
2047 assert!(output.contains("NAME"));
2048 assert!(output.contains("my-api"));
2049 assert!(output.contains("running"));
2050 assert!(output.contains("claude"));
2051 assert!(output.contains("Fix the bug"));
2052 }
2053
2054 #[test]
2055 fn test_format_sessions_long_prompt_truncated() {
2056 use chrono::Utc;
2057 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2058 use uuid::Uuid;
2059
2060 let sessions = vec![Session {
2061 id: Uuid::nil(),
2062 name: "test".into(),
2063 workdir: "/tmp".into(),
2064 provider: Provider::Codex,
2065 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2066 status: SessionStatus::Completed,
2067 mode: SessionMode::Autonomous,
2068 conversation_id: None,
2069 exit_code: None,
2070 backend_session_id: None,
2071 output_snapshot: None,
2072 guard_config: None,
2073 model: None,
2074 allowed_tools: None,
2075 system_prompt: None,
2076 metadata: None,
2077 ink: None,
2078 max_turns: None,
2079 max_budget_usd: None,
2080 output_format: None,
2081 intervention_reason: None,
2082 intervention_at: None,
2083 last_output_at: None,
2084 idle_since: None,
2085 waiting_for_input: false,
2086 created_at: Utc::now(),
2087 updated_at: Utc::now(),
2088 }];
2089 let output = format_sessions(&sessions);
2090 assert!(output.contains("..."));
2091 }
2092
2093 #[test]
2094 fn test_format_sessions_waiting_for_input() {
2095 use chrono::Utc;
2096 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2097 use uuid::Uuid;
2098
2099 let sessions = vec![Session {
2100 id: Uuid::nil(),
2101 name: "blocked".into(),
2102 workdir: "/tmp".into(),
2103 provider: Provider::Claude,
2104 prompt: "Fix bug".into(),
2105 status: SessionStatus::Running,
2106 mode: SessionMode::Interactive,
2107 conversation_id: None,
2108 exit_code: None,
2109 backend_session_id: None,
2110 output_snapshot: None,
2111 guard_config: None,
2112 model: None,
2113 allowed_tools: None,
2114 system_prompt: None,
2115 metadata: None,
2116 ink: None,
2117 max_turns: None,
2118 max_budget_usd: None,
2119 output_format: None,
2120 intervention_reason: None,
2121 intervention_at: None,
2122 last_output_at: None,
2123 idle_since: None,
2124 waiting_for_input: true,
2125 created_at: Utc::now(),
2126 updated_at: Utc::now(),
2127 }];
2128 let output = format_sessions(&sessions);
2129 assert!(output.contains("waiting"));
2130 assert!(!output.contains("running"));
2131 }
2132
2133 #[test]
2134 fn test_format_nodes() {
2135 use pulpo_common::node::NodeInfo;
2136 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2137
2138 let resp = PeersResponse {
2139 local: NodeInfo {
2140 name: "mac-mini".into(),
2141 hostname: "h".into(),
2142 os: "macos".into(),
2143 arch: "arm64".into(),
2144 cpus: 8,
2145 memory_mb: 16384,
2146 gpu: None,
2147 },
2148 peers: vec![PeerInfo {
2149 name: "win-pc".into(),
2150 address: "win-pc:7433".into(),
2151 status: PeerStatus::Online,
2152 node_info: None,
2153 session_count: Some(3),
2154 source: PeerSource::Configured,
2155 }],
2156 };
2157 let output = format_nodes(&resp);
2158 assert!(output.contains("mac-mini"));
2159 assert!(output.contains("(local)"));
2160 assert!(output.contains("win-pc"));
2161 assert!(output.contains('3'));
2162 }
2163
2164 #[test]
2165 fn test_format_nodes_no_session_count() {
2166 use pulpo_common::node::NodeInfo;
2167 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2168
2169 let resp = PeersResponse {
2170 local: NodeInfo {
2171 name: "local".into(),
2172 hostname: "h".into(),
2173 os: "linux".into(),
2174 arch: "x86_64".into(),
2175 cpus: 4,
2176 memory_mb: 8192,
2177 gpu: None,
2178 },
2179 peers: vec![PeerInfo {
2180 name: "peer".into(),
2181 address: "peer:7433".into(),
2182 status: PeerStatus::Offline,
2183 node_info: None,
2184 session_count: None,
2185 source: PeerSource::Configured,
2186 }],
2187 };
2188 let output = format_nodes(&resp);
2189 assert!(output.contains("offline"));
2190 let lines: Vec<&str> = output.lines().collect();
2192 assert!(lines[2].contains('-'));
2193 }
2194
2195 #[tokio::test]
2196 async fn test_execute_resume_connection_refused() {
2197 let cli = Cli {
2198 node: "localhost:1".into(),
2199 token: None,
2200 command: Commands::Resume {
2201 name: "test".into(),
2202 },
2203 };
2204 let result = execute(&cli).await;
2205 let err = result.unwrap_err().to_string();
2206 assert!(err.contains("Could not connect to pulpod"));
2207 }
2208
2209 #[tokio::test]
2210 async fn test_execute_spawn_connection_refused() {
2211 let cli = Cli {
2212 node: "localhost:1".into(),
2213 token: None,
2214 command: Commands::Spawn {
2215 workdir: Some("/tmp".into()),
2216 name: None,
2217 provider: Some("claude".into()),
2218 auto: false,
2219 unrestricted: false,
2220 model: None,
2221 system_prompt: None,
2222 allowed_tools: None,
2223 ink: None,
2224 max_turns: None,
2225 max_budget: None,
2226 output_format: None,
2227 worktree: false,
2228 prompt: vec!["test".into()],
2229 },
2230 };
2231 let result = execute(&cli).await;
2232 let err = result.unwrap_err().to_string();
2233 assert!(err.contains("Could not connect to pulpod"));
2234 }
2235
2236 #[tokio::test]
2237 async fn test_execute_kill_connection_refused() {
2238 let cli = Cli {
2239 node: "localhost:1".into(),
2240 token: None,
2241 command: Commands::Kill {
2242 name: "test".into(),
2243 },
2244 };
2245 let result = execute(&cli).await;
2246 let err = result.unwrap_err().to_string();
2247 assert!(err.contains("Could not connect to pulpod"));
2248 }
2249
2250 #[tokio::test]
2251 async fn test_execute_delete_connection_refused() {
2252 let cli = Cli {
2253 node: "localhost:1".into(),
2254 token: None,
2255 command: Commands::Delete {
2256 name: "test".into(),
2257 },
2258 };
2259 let result = execute(&cli).await;
2260 let err = result.unwrap_err().to_string();
2261 assert!(err.contains("Could not connect to pulpod"));
2262 }
2263
2264 #[tokio::test]
2265 async fn test_execute_logs_connection_refused() {
2266 let cli = Cli {
2267 node: "localhost:1".into(),
2268 token: None,
2269 command: Commands::Logs {
2270 name: "test".into(),
2271 lines: 50,
2272 follow: false,
2273 },
2274 };
2275 let result = execute(&cli).await;
2276 let err = result.unwrap_err().to_string();
2277 assert!(err.contains("Could not connect to pulpod"));
2278 }
2279
2280 #[tokio::test]
2281 async fn test_friendly_error_connect() {
2282 let err = reqwest::Client::new()
2284 .get("http://127.0.0.1:1")
2285 .send()
2286 .await
2287 .unwrap_err();
2288 let friendly = friendly_error(&err, "test-node:1");
2289 let msg = friendly.to_string();
2290 assert!(
2291 msg.contains("Could not connect"),
2292 "Expected connect message, got: {msg}"
2293 );
2294 }
2295
2296 #[tokio::test]
2297 async fn test_friendly_error_other() {
2298 let err = reqwest::Client::new()
2300 .get("http://[::invalid::url")
2301 .send()
2302 .await
2303 .unwrap_err();
2304 let friendly = friendly_error(&err, "bad-host");
2305 let msg = friendly.to_string();
2306 assert!(
2307 msg.contains("Network error"),
2308 "Expected network error message, got: {msg}"
2309 );
2310 assert!(msg.contains("bad-host"));
2311 }
2312
2313 #[test]
2316 fn test_is_localhost_variants() {
2317 assert!(is_localhost("localhost:7433"));
2318 assert!(is_localhost("127.0.0.1:7433"));
2319 assert!(is_localhost("[::1]:7433"));
2320 assert!(is_localhost("::1"));
2321 assert!(is_localhost("localhost"));
2322 assert!(!is_localhost("mac-mini:7433"));
2323 assert!(!is_localhost("192.168.1.100:7433"));
2324 }
2325
2326 #[test]
2327 fn test_authed_get_with_token() {
2328 let client = reqwest::Client::new();
2329 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2330 .build()
2331 .unwrap();
2332 let auth = req
2333 .headers()
2334 .get("authorization")
2335 .unwrap()
2336 .to_str()
2337 .unwrap();
2338 assert_eq!(auth, "Bearer tok");
2339 }
2340
2341 #[test]
2342 fn test_authed_get_without_token() {
2343 let client = reqwest::Client::new();
2344 let req = authed_get(&client, "http://h:1/api".into(), None)
2345 .build()
2346 .unwrap();
2347 assert!(req.headers().get("authorization").is_none());
2348 }
2349
2350 #[test]
2351 fn test_authed_post_with_token() {
2352 let client = reqwest::Client::new();
2353 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2354 .build()
2355 .unwrap();
2356 let auth = req
2357 .headers()
2358 .get("authorization")
2359 .unwrap()
2360 .to_str()
2361 .unwrap();
2362 assert_eq!(auth, "Bearer secret");
2363 }
2364
2365 #[test]
2366 fn test_authed_post_without_token() {
2367 let client = reqwest::Client::new();
2368 let req = authed_post(&client, "http://h:1/api".into(), None)
2369 .build()
2370 .unwrap();
2371 assert!(req.headers().get("authorization").is_none());
2372 }
2373
2374 #[test]
2375 fn test_authed_delete_with_token() {
2376 let client = reqwest::Client::new();
2377 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2378 .build()
2379 .unwrap();
2380 let auth = req
2381 .headers()
2382 .get("authorization")
2383 .unwrap()
2384 .to_str()
2385 .unwrap();
2386 assert_eq!(auth, "Bearer del-tok");
2387 }
2388
2389 #[test]
2390 fn test_authed_delete_without_token() {
2391 let client = reqwest::Client::new();
2392 let req = authed_delete(&client, "http://h:1/api".into(), None)
2393 .build()
2394 .unwrap();
2395 assert!(req.headers().get("authorization").is_none());
2396 }
2397
2398 #[tokio::test]
2399 async fn test_resolve_token_explicit() {
2400 let client = reqwest::Client::new();
2401 let token =
2402 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2403 assert_eq!(token, Some("my-tok".into()));
2404 }
2405
2406 #[tokio::test]
2407 async fn test_resolve_token_remote_no_explicit() {
2408 let client = reqwest::Client::new();
2409 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2410 assert_eq!(token, None);
2411 }
2412
2413 #[tokio::test]
2414 async fn test_resolve_token_localhost_auto_discover() {
2415 use axum::{Json, Router, routing::get};
2416
2417 let app = Router::new().route(
2418 "/api/v1/auth/token",
2419 get(|| async {
2420 Json(AuthTokenResponse {
2421 token: "discovered".into(),
2422 })
2423 }),
2424 );
2425 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2426 let addr = listener.local_addr().unwrap();
2427 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2428
2429 let node = format!("localhost:{}", addr.port());
2430 let base = base_url(&node);
2431 let client = reqwest::Client::new();
2432 let token = resolve_token(&client, &base, &node, None).await;
2433 assert_eq!(token, Some("discovered".into()));
2434 }
2435
2436 #[tokio::test]
2437 async fn test_discover_token_empty_returns_none() {
2438 use axum::{Json, Router, routing::get};
2439
2440 let app = Router::new().route(
2441 "/api/v1/auth/token",
2442 get(|| async {
2443 Json(AuthTokenResponse {
2444 token: String::new(),
2445 })
2446 }),
2447 );
2448 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2449 let addr = listener.local_addr().unwrap();
2450 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2451
2452 let base = format!("http://127.0.0.1:{}", addr.port());
2453 let client = reqwest::Client::new();
2454 assert_eq!(discover_token(&client, &base).await, None);
2455 }
2456
2457 #[tokio::test]
2458 async fn test_discover_token_unreachable_returns_none() {
2459 let client = reqwest::Client::new();
2460 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2461 }
2462
2463 #[test]
2464 fn test_cli_parse_with_token() {
2465 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2466 assert_eq!(cli.token, Some("my-secret".into()));
2467 }
2468
2469 #[test]
2470 fn test_cli_parse_without_token() {
2471 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2472 assert_eq!(cli.token, None);
2473 }
2474
2475 #[tokio::test]
2476 async fn test_execute_with_explicit_token_sends_header() {
2477 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2478
2479 let app = Router::new().route(
2480 "/api/v1/sessions",
2481 get(|req: Request| async move {
2482 let auth = req
2483 .headers()
2484 .get("authorization")
2485 .and_then(|v| v.to_str().ok())
2486 .unwrap_or("");
2487 assert_eq!(auth, "Bearer test-token");
2488 (StatusCode::OK, "[]".to_owned())
2489 }),
2490 );
2491 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2492 let addr = listener.local_addr().unwrap();
2493 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2494 let node = format!("127.0.0.1:{}", addr.port());
2495
2496 let cli = Cli {
2497 node,
2498 token: Some("test-token".into()),
2499 command: Commands::List,
2500 };
2501 let result = execute(&cli).await.unwrap();
2502 assert_eq!(result, "No sessions.");
2503 }
2504
2505 #[test]
2508 fn test_cli_parse_interventions() {
2509 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2510 assert!(matches!(
2511 &cli.command,
2512 Commands::Interventions { name } if name == "my-session"
2513 ));
2514 }
2515
2516 #[test]
2517 fn test_format_interventions_empty() {
2518 assert_eq!(format_interventions(&[]), "No intervention events.");
2519 }
2520
2521 #[test]
2522 fn test_format_interventions_with_data() {
2523 let events = vec![
2524 InterventionEventResponse {
2525 id: 1,
2526 session_id: "sess-1".into(),
2527 reason: "Memory exceeded threshold".into(),
2528 created_at: "2026-01-01T00:00:00Z".into(),
2529 },
2530 InterventionEventResponse {
2531 id: 2,
2532 session_id: "sess-1".into(),
2533 reason: "Idle for 10 minutes".into(),
2534 created_at: "2026-01-02T00:00:00Z".into(),
2535 },
2536 ];
2537 let output = format_interventions(&events);
2538 assert!(output.contains("ID"));
2539 assert!(output.contains("TIMESTAMP"));
2540 assert!(output.contains("REASON"));
2541 assert!(output.contains("Memory exceeded threshold"));
2542 assert!(output.contains("Idle for 10 minutes"));
2543 assert!(output.contains("2026-01-01T00:00:00Z"));
2544 }
2545
2546 #[tokio::test]
2547 async fn test_execute_interventions_empty() {
2548 let node = start_test_server().await;
2549 let cli = Cli {
2550 node,
2551 token: None,
2552 command: Commands::Interventions {
2553 name: "my-session".into(),
2554 },
2555 };
2556 let result = execute(&cli).await.unwrap();
2557 assert_eq!(result, "No intervention events.");
2558 }
2559
2560 #[tokio::test]
2561 async fn test_execute_interventions_with_data() {
2562 use axum::{Router, routing::get};
2563
2564 let app = Router::new().route(
2565 "/api/v1/sessions/{id}/interventions",
2566 get(|| async {
2567 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2568 .to_owned()
2569 }),
2570 );
2571 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2572 let addr = listener.local_addr().unwrap();
2573 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2574 let node = format!("127.0.0.1:{}", addr.port());
2575
2576 let cli = Cli {
2577 node,
2578 token: None,
2579 command: Commands::Interventions {
2580 name: "test".into(),
2581 },
2582 };
2583 let result = execute(&cli).await.unwrap();
2584 assert!(result.contains("OOM"));
2585 assert!(result.contains("2026-01-01T00:00:00Z"));
2586 }
2587
2588 #[tokio::test]
2589 async fn test_execute_interventions_connection_refused() {
2590 let cli = Cli {
2591 node: "localhost:1".into(),
2592 token: None,
2593 command: Commands::Interventions {
2594 name: "test".into(),
2595 },
2596 };
2597 let result = execute(&cli).await;
2598 let err = result.unwrap_err().to_string();
2599 assert!(err.contains("Could not connect to pulpod"));
2600 }
2601
2602 #[test]
2605 fn test_build_attach_command() {
2606 let cmd = build_attach_command("my-session");
2607 assert_eq!(cmd.get_program(), "tmux");
2608 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2609 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2610 }
2611
2612 #[test]
2613 fn test_cli_parse_attach() {
2614 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2615 assert!(matches!(
2616 &cli.command,
2617 Commands::Attach { name } if name == "my-session"
2618 ));
2619 }
2620
2621 #[test]
2622 fn test_cli_parse_attach_alias() {
2623 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2624 assert!(matches!(
2625 &cli.command,
2626 Commands::Attach { name } if name == "my-session"
2627 ));
2628 }
2629
2630 #[tokio::test]
2631 async fn test_execute_attach_success() {
2632 let node = start_test_server().await;
2633 let cli = Cli {
2634 node,
2635 token: None,
2636 command: Commands::Attach {
2637 name: "test-session".into(),
2638 },
2639 };
2640 let result = execute(&cli).await.unwrap();
2641 assert!(result.contains("Detached from session test-session"));
2642 }
2643
2644 #[tokio::test]
2645 async fn test_execute_attach_with_backend_session_id() {
2646 use axum::{Router, routing::get};
2647 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2648 let app = Router::new().route(
2649 "/api/v1/sessions/{id}",
2650 get(move || async move { session_json.to_owned() }),
2651 );
2652 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2653 let addr = listener.local_addr().unwrap();
2654 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2655
2656 let cli = Cli {
2657 node: format!("127.0.0.1:{}", addr.port()),
2658 token: None,
2659 command: Commands::Attach {
2660 name: "my-session".into(),
2661 },
2662 };
2663 let result = execute(&cli).await.unwrap();
2664 assert!(result.contains("Detached from session my-session"));
2665 }
2666
2667 #[tokio::test]
2668 async fn test_execute_attach_connection_refused() {
2669 let cli = Cli {
2670 node: "localhost:1".into(),
2671 token: None,
2672 command: Commands::Attach {
2673 name: "test-session".into(),
2674 },
2675 };
2676 let result = execute(&cli).await;
2677 let err = result.unwrap_err().to_string();
2678 assert!(err.contains("Could not connect to pulpod"));
2679 }
2680
2681 #[tokio::test]
2682 async fn test_execute_attach_error_response() {
2683 use axum::{Router, http::StatusCode, routing::get};
2684 let app = Router::new().route(
2685 "/api/v1/sessions/{id}",
2686 get(|| async {
2687 (
2688 StatusCode::NOT_FOUND,
2689 r#"{"error":"session not found"}"#.to_owned(),
2690 )
2691 }),
2692 );
2693 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2694 let addr = listener.local_addr().unwrap();
2695 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2696
2697 let cli = Cli {
2698 node: format!("127.0.0.1:{}", addr.port()),
2699 token: None,
2700 command: Commands::Attach {
2701 name: "nonexistent".into(),
2702 },
2703 };
2704 let result = execute(&cli).await;
2705 let err = result.unwrap_err().to_string();
2706 assert!(err.contains("session not found"));
2707 }
2708
2709 #[tokio::test]
2710 async fn test_execute_attach_stale_session() {
2711 use axum::{Router, routing::get};
2712 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2713 let app = Router::new().route(
2714 "/api/v1/sessions/{id}",
2715 get(move || async move { session_json.to_owned() }),
2716 );
2717 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2718 let addr = listener.local_addr().unwrap();
2719 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2720
2721 let cli = Cli {
2722 node: format!("127.0.0.1:{}", addr.port()),
2723 token: None,
2724 command: Commands::Attach {
2725 name: "stale-sess".into(),
2726 },
2727 };
2728 let result = execute(&cli).await;
2729 let err = result.unwrap_err().to_string();
2730 assert!(err.contains("stale"));
2731 assert!(err.contains("pulpo resume"));
2732 }
2733
2734 #[tokio::test]
2735 async fn test_execute_attach_dead_session() {
2736 use axum::{Router, routing::get};
2737 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2738 let app = Router::new().route(
2739 "/api/v1/sessions/{id}",
2740 get(move || async move { session_json.to_owned() }),
2741 );
2742 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2743 let addr = listener.local_addr().unwrap();
2744 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2745
2746 let cli = Cli {
2747 node: format!("127.0.0.1:{}", addr.port()),
2748 token: None,
2749 command: Commands::Attach {
2750 name: "dead-sess".into(),
2751 },
2752 };
2753 let result = execute(&cli).await;
2754 let err = result.unwrap_err().to_string();
2755 assert!(err.contains("dead"));
2756 assert!(err.contains("cannot attach"));
2757 }
2758
2759 #[test]
2762 fn test_cli_parse_alias_spawn() {
2763 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2764 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2765 }
2766
2767 #[test]
2768 fn test_cli_parse_alias_list() {
2769 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2770 assert!(matches!(cli.command, Commands::List));
2771 }
2772
2773 #[test]
2774 fn test_cli_parse_alias_logs() {
2775 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2776 assert!(matches!(
2777 &cli.command,
2778 Commands::Logs { name, .. } if name == "my-session"
2779 ));
2780 }
2781
2782 #[test]
2783 fn test_cli_parse_alias_kill() {
2784 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2785 assert!(matches!(
2786 &cli.command,
2787 Commands::Kill { name } if name == "my-session"
2788 ));
2789 }
2790
2791 #[test]
2792 fn test_cli_parse_alias_delete() {
2793 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2794 assert!(matches!(
2795 &cli.command,
2796 Commands::Delete { name } if name == "my-session"
2797 ));
2798 }
2799
2800 #[test]
2801 fn test_cli_parse_alias_resume() {
2802 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2803 assert!(matches!(
2804 &cli.command,
2805 Commands::Resume { name } if name == "my-session"
2806 ));
2807 }
2808
2809 #[test]
2810 fn test_cli_parse_alias_nodes() {
2811 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2812 assert!(matches!(cli.command, Commands::Nodes));
2813 }
2814
2815 #[test]
2816 fn test_cli_parse_alias_interventions() {
2817 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2818 assert!(matches!(
2819 &cli.command,
2820 Commands::Interventions { name } if name == "my-session"
2821 ));
2822 }
2823
2824 #[test]
2825 fn test_api_error_json() {
2826 let err = api_error("{\"error\":\"session not found: foo\"}");
2827 assert_eq!(err.to_string(), "session not found: foo");
2828 }
2829
2830 #[test]
2831 fn test_api_error_plain_text() {
2832 let err = api_error("plain text error");
2833 assert_eq!(err.to_string(), "plain text error");
2834 }
2835
2836 #[test]
2839 fn test_diff_output_empty_prev() {
2840 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2841 }
2842
2843 #[test]
2844 fn test_diff_output_identical() {
2845 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2846 }
2847
2848 #[test]
2849 fn test_diff_output_new_lines_appended() {
2850 let prev = "line1\nline2";
2851 let new = "line1\nline2\nline3\nline4";
2852 assert_eq!(diff_output(prev, new), "line3\nline4");
2853 }
2854
2855 #[test]
2856 fn test_diff_output_scrolled_window() {
2857 let prev = "line1\nline2\nline3";
2859 let new = "line2\nline3\nline4";
2860 assert_eq!(diff_output(prev, new), "line4");
2861 }
2862
2863 #[test]
2864 fn test_diff_output_completely_different() {
2865 let prev = "aaa\nbbb";
2866 let new = "xxx\nyyy";
2867 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2868 }
2869
2870 #[test]
2871 fn test_diff_output_last_line_matches_but_overlap_fails() {
2872 let prev = "aaa\ncommon";
2874 let new = "zzz\ncommon\nnew_line";
2875 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2879 }
2880
2881 #[test]
2882 fn test_diff_output_new_empty() {
2883 assert_eq!(diff_output("line1", ""), "");
2884 }
2885
2886 async fn start_follow_test_server() -> String {
2890 use axum::{Router, extract::Path, extract::Query, routing::get};
2891 use std::sync::Arc;
2892 use std::sync::atomic::{AtomicUsize, Ordering};
2893
2894 let call_count = Arc::new(AtomicUsize::new(0));
2895 let output_count = call_count.clone();
2896 let status_count = Arc::new(AtomicUsize::new(0));
2897 let status_count_inner = status_count.clone();
2898
2899 let app = Router::new()
2900 .route(
2901 "/api/v1/sessions/{id}/output",
2902 get(
2903 move |_path: Path<String>,
2904 _query: Query<std::collections::HashMap<String, String>>| {
2905 let count = output_count.clone();
2906 async move {
2907 let n = count.fetch_add(1, Ordering::SeqCst);
2908 let output = match n {
2909 0 => "line1\nline2".to_owned(),
2910 1 => "line1\nline2\nline3".to_owned(),
2911 _ => "line2\nline3\nline4".to_owned(),
2912 };
2913 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2914 }
2915 },
2916 ),
2917 )
2918 .route(
2919 "/api/v1/sessions/{id}",
2920 get(move |_path: Path<String>| {
2921 let count = status_count_inner.clone();
2922 async move {
2923 let n = count.fetch_add(1, Ordering::SeqCst);
2924 let status = if n < 2 { "running" } else { "completed" };
2925 format!(
2926 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2927 )
2928 }
2929 }),
2930 );
2931
2932 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2933 let addr = listener.local_addr().unwrap();
2934 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2935 format!("http://127.0.0.1:{}", addr.port())
2936 }
2937
2938 #[tokio::test]
2939 async fn test_follow_logs_polls_and_exits_on_completed() {
2940 let base = start_follow_test_server().await;
2941 let client = reqwest::Client::new();
2942 let mut buf = Vec::new();
2943
2944 follow_logs(&client, &base, "test", 100, None, &mut buf)
2945 .await
2946 .unwrap();
2947
2948 let output = String::from_utf8(buf).unwrap();
2949 assert!(output.contains("line1"));
2951 assert!(output.contains("line2"));
2952 assert!(output.contains("line3"));
2953 assert!(output.contains("line4"));
2954 }
2955
2956 #[tokio::test]
2957 async fn test_execute_logs_follow_success() {
2958 let base = start_follow_test_server().await;
2959 let node = base.strip_prefix("http://").unwrap().to_owned();
2961
2962 let cli = Cli {
2963 node,
2964 token: None,
2965 command: Commands::Logs {
2966 name: "test".into(),
2967 lines: 100,
2968 follow: true,
2969 },
2970 };
2971 let result = execute(&cli).await.unwrap();
2973 assert_eq!(result, "");
2974 }
2975
2976 #[tokio::test]
2977 async fn test_execute_logs_follow_connection_refused() {
2978 let cli = Cli {
2979 node: "localhost:1".into(),
2980 token: None,
2981 command: Commands::Logs {
2982 name: "test".into(),
2983 lines: 50,
2984 follow: true,
2985 },
2986 };
2987 let result = execute(&cli).await;
2988 let err = result.unwrap_err().to_string();
2989 assert!(
2990 err.contains("Could not connect to pulpod"),
2991 "Expected friendly error, got: {err}"
2992 );
2993 }
2994
2995 #[tokio::test]
2996 async fn test_follow_logs_exits_on_dead() {
2997 use axum::{Router, extract::Path, extract::Query, routing::get};
2998
2999 let app = Router::new()
3000 .route(
3001 "/api/v1/sessions/{id}/output",
3002 get(
3003 |_path: Path<String>,
3004 _query: Query<std::collections::HashMap<String, String>>| async {
3005 r#"{"output":"some output"}"#.to_owned()
3006 },
3007 ),
3008 )
3009 .route(
3010 "/api/v1/sessions/{id}",
3011 get(|_path: Path<String>| async {
3012 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3013 }),
3014 );
3015
3016 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3017 let addr = listener.local_addr().unwrap();
3018 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3019 let base = format!("http://127.0.0.1:{}", addr.port());
3020
3021 let client = reqwest::Client::new();
3022 let mut buf = Vec::new();
3023 follow_logs(&client, &base, "test", 100, None, &mut buf)
3024 .await
3025 .unwrap();
3026
3027 let output = String::from_utf8(buf).unwrap();
3028 assert!(output.contains("some output"));
3029 }
3030
3031 #[tokio::test]
3032 async fn test_follow_logs_exits_on_stale() {
3033 use axum::{Router, extract::Path, extract::Query, routing::get};
3034
3035 let app = Router::new()
3036 .route(
3037 "/api/v1/sessions/{id}/output",
3038 get(
3039 |_path: Path<String>,
3040 _query: Query<std::collections::HashMap<String, String>>| async {
3041 r#"{"output":"stale output"}"#.to_owned()
3042 },
3043 ),
3044 )
3045 .route(
3046 "/api/v1/sessions/{id}",
3047 get(|_path: Path<String>| async {
3048 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3049 }),
3050 );
3051
3052 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3053 let addr = listener.local_addr().unwrap();
3054 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3055 let base = format!("http://127.0.0.1:{}", addr.port());
3056
3057 let client = reqwest::Client::new();
3058 let mut buf = Vec::new();
3059 follow_logs(&client, &base, "test", 100, None, &mut buf)
3060 .await
3061 .unwrap();
3062
3063 let output = String::from_utf8(buf).unwrap();
3064 assert!(output.contains("stale output"));
3065 }
3066
3067 #[tokio::test]
3068 async fn test_execute_logs_follow_non_reqwest_error() {
3069 use axum::{Router, extract::Path, extract::Query, routing::get};
3070
3071 let app = Router::new()
3073 .route(
3074 "/api/v1/sessions/{id}/output",
3075 get(
3076 |_path: Path<String>,
3077 _query: Query<std::collections::HashMap<String, String>>| async {
3078 r#"{"output":"initial"}"#.to_owned()
3079 },
3080 ),
3081 )
3082 .route(
3083 "/api/v1/sessions/{id}",
3084 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3085 );
3086
3087 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3088 let addr = listener.local_addr().unwrap();
3089 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3090 let node = format!("127.0.0.1:{}", addr.port());
3091
3092 let cli = Cli {
3093 node,
3094 token: None,
3095 command: Commands::Logs {
3096 name: "test".into(),
3097 lines: 100,
3098 follow: true,
3099 },
3100 };
3101 let err = execute(&cli).await.unwrap_err();
3102 let msg = err.to_string();
3104 assert!(
3105 msg.contains("expected ident"),
3106 "Expected serde parse error, got: {msg}"
3107 );
3108 }
3109
3110 #[test]
3111 fn test_cli_parse_spawn_with_guardrails() {
3112 let cli = Cli::try_parse_from([
3113 "pulpo",
3114 "spawn",
3115 "--workdir",
3116 "/tmp",
3117 "--max-turns",
3118 "10",
3119 "--max-budget",
3120 "5.5",
3121 "--output-format",
3122 "json",
3123 "Do it",
3124 ])
3125 .unwrap();
3126 assert!(matches!(
3127 &cli.command,
3128 Commands::Spawn { max_turns, max_budget, output_format, .. }
3129 if *max_turns == Some(10) && *max_budget == Some(5.5)
3130 && output_format.as_deref() == Some("json")
3131 ));
3132 }
3133
3134 #[tokio::test]
3135 async fn test_fetch_session_status_connection_error() {
3136 let client = reqwest::Client::new();
3137 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3138 assert!(result.is_err());
3139 }
3140
3141 #[test]
3144 fn test_build_crontab_line() {
3145 let line = build_crontab_line(
3146 "nightly-review",
3147 "0 3 * * *",
3148 "/home/me/repo",
3149 "claude",
3150 "Review PRs",
3151 "localhost:7433",
3152 );
3153 assert_eq!(
3154 line,
3155 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3156 );
3157 }
3158
3159 #[test]
3160 fn test_crontab_install_success() {
3161 let crontab = "# existing cron\n0 * * * * echo hi\n";
3162 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3163 let result = crontab_install(crontab, "my-job", line).unwrap();
3164 assert!(result.starts_with("# existing cron\n"));
3165 assert!(result.ends_with("#pulpo:my-job\n"));
3166 assert!(result.contains("echo hi"));
3167 }
3168
3169 #[test]
3170 fn test_crontab_install_duplicate_error() {
3171 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3172 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3173 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3174 assert!(err.to_string().contains("already exists"));
3175 }
3176
3177 #[test]
3178 fn test_crontab_list_empty() {
3179 assert_eq!(crontab_list(""), "No pulpo schedules.");
3180 }
3181
3182 #[test]
3183 fn test_crontab_list_no_pulpo_entries() {
3184 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3185 }
3186
3187 #[test]
3188 fn test_crontab_list_with_entries() {
3189 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3190 let output = crontab_list(crontab);
3191 assert!(output.contains("NAME"));
3192 assert!(output.contains("CRON"));
3193 assert!(output.contains("PAUSED"));
3194 assert!(output.contains("nightly"));
3195 assert!(output.contains("0 3 * * *"));
3196 assert!(output.contains("no"));
3197 }
3198
3199 #[test]
3200 fn test_crontab_list_paused_entry() {
3201 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3202 let output = crontab_list(crontab);
3203 assert!(output.contains("paused-job"));
3204 assert!(output.contains("yes"));
3205 }
3206
3207 #[test]
3208 fn test_crontab_list_short_line() {
3209 let crontab = "badcron #pulpo:broken\n";
3211 let output = crontab_list(crontab);
3212 assert!(output.contains("broken"));
3213 assert!(output.contains('?'));
3214 }
3215
3216 #[test]
3217 fn test_crontab_remove_success() {
3218 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3219 let result = crontab_remove(crontab, "my-job").unwrap();
3220 assert!(result.contains("echo hi"));
3221 assert!(!result.contains("my-job"));
3222 }
3223
3224 #[test]
3225 fn test_crontab_remove_not_found() {
3226 let crontab = "0 * * * * echo hi\n";
3227 let err = crontab_remove(crontab, "ghost").unwrap_err();
3228 assert!(err.to_string().contains("not found"));
3229 }
3230
3231 #[test]
3232 fn test_crontab_pause_success() {
3233 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3234 let result = crontab_pause(crontab, "my-job").unwrap();
3235 assert!(result.starts_with('#'));
3236 assert!(result.contains("#pulpo:my-job"));
3237 }
3238
3239 #[test]
3240 fn test_crontab_pause_not_found() {
3241 let crontab = "0 * * * * echo hi\n";
3242 let err = crontab_pause(crontab, "ghost").unwrap_err();
3243 assert!(err.to_string().contains("not found or already paused"));
3244 }
3245
3246 #[test]
3247 fn test_crontab_pause_already_paused() {
3248 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3249 let err = crontab_pause(crontab, "my-job").unwrap_err();
3250 assert!(err.to_string().contains("already paused"));
3251 }
3252
3253 #[test]
3254 fn test_crontab_resume_success() {
3255 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3256 let result = crontab_resume(crontab, "my-job").unwrap();
3257 assert!(!result.starts_with('#'));
3258 assert!(result.contains("#pulpo:my-job"));
3259 }
3260
3261 #[test]
3262 fn test_crontab_resume_not_found() {
3263 let crontab = "0 * * * * echo hi\n";
3264 let err = crontab_resume(crontab, "ghost").unwrap_err();
3265 assert!(err.to_string().contains("not found or not paused"));
3266 }
3267
3268 #[test]
3269 fn test_crontab_resume_not_paused() {
3270 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3271 let err = crontab_resume(crontab, "my-job").unwrap_err();
3272 assert!(err.to_string().contains("not paused"));
3273 }
3274
3275 #[test]
3278 fn test_cli_parse_schedule_install() {
3279 let cli = Cli::try_parse_from([
3280 "pulpo",
3281 "schedule",
3282 "install",
3283 "nightly",
3284 "0 3 * * *",
3285 "--workdir",
3286 "/tmp/repo",
3287 "Review",
3288 "PRs",
3289 ])
3290 .unwrap();
3291 assert!(matches!(
3292 &cli.command,
3293 Commands::Schedule {
3294 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3295 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3296 && provider == "claude" && prompt == &["Review", "PRs"]
3297 ));
3298 }
3299
3300 #[test]
3301 fn test_cli_parse_schedule_list() {
3302 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3303 assert!(matches!(
3304 &cli.command,
3305 Commands::Schedule {
3306 action: ScheduleAction::List
3307 }
3308 ));
3309 }
3310
3311 #[test]
3312 fn test_cli_parse_schedule_remove() {
3313 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3314 assert!(matches!(
3315 &cli.command,
3316 Commands::Schedule {
3317 action: ScheduleAction::Remove { name }
3318 } if name == "nightly"
3319 ));
3320 }
3321
3322 #[test]
3323 fn test_cli_parse_schedule_pause() {
3324 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3325 assert!(matches!(
3326 &cli.command,
3327 Commands::Schedule {
3328 action: ScheduleAction::Pause { name }
3329 } if name == "nightly"
3330 ));
3331 }
3332
3333 #[test]
3334 fn test_cli_parse_schedule_resume() {
3335 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3336 assert!(matches!(
3337 &cli.command,
3338 Commands::Schedule {
3339 action: ScheduleAction::Resume { name }
3340 } if name == "nightly"
3341 ));
3342 }
3343
3344 #[test]
3345 fn test_cli_parse_schedule_alias() {
3346 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3347 assert!(matches!(
3348 &cli.command,
3349 Commands::Schedule {
3350 action: ScheduleAction::List
3351 }
3352 ));
3353 }
3354
3355 #[test]
3356 fn test_cli_parse_schedule_list_alias() {
3357 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3358 assert!(matches!(
3359 &cli.command,
3360 Commands::Schedule {
3361 action: ScheduleAction::List
3362 }
3363 ));
3364 }
3365
3366 #[test]
3367 fn test_cli_parse_schedule_remove_alias() {
3368 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3369 assert!(matches!(
3370 &cli.command,
3371 Commands::Schedule {
3372 action: ScheduleAction::Remove { name }
3373 } if name == "nightly"
3374 ));
3375 }
3376
3377 #[test]
3378 fn test_cli_parse_schedule_install_custom_provider() {
3379 let cli = Cli::try_parse_from([
3380 "pulpo",
3381 "schedule",
3382 "install",
3383 "daily",
3384 "0 9 * * *",
3385 "--workdir",
3386 "/tmp",
3387 "--provider",
3388 "codex",
3389 "Run tests",
3390 ])
3391 .unwrap();
3392 assert!(matches!(
3393 &cli.command,
3394 Commands::Schedule {
3395 action: ScheduleAction::Install { provider, .. }
3396 } if provider == "codex"
3397 ));
3398 }
3399
3400 #[tokio::test]
3401 async fn test_execute_schedule_via_execute() {
3402 let node = start_test_server().await;
3404 let cli = Cli {
3405 node,
3406 token: None,
3407 command: Commands::Schedule {
3408 action: ScheduleAction::List,
3409 },
3410 };
3411 let result = execute(&cli).await;
3412 assert!(result.is_ok() || result.is_err());
3414 }
3415
3416 #[test]
3417 fn test_schedule_action_debug() {
3418 let action = ScheduleAction::List;
3419 assert_eq!(format!("{action:?}"), "List");
3420 }
3421
3422 #[test]
3425 fn test_cli_parse_knowledge() {
3426 let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3427 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3428 }
3429
3430 #[test]
3431 fn test_cli_parse_knowledge_alias() {
3432 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3433 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3434 }
3435
3436 #[test]
3437 fn test_cli_parse_knowledge_with_filters() {
3438 let cli = Cli::try_parse_from([
3439 "pulpo",
3440 "knowledge",
3441 "--kind",
3442 "failure",
3443 "--repo",
3444 "/tmp/repo",
3445 "--ink",
3446 "coder",
3447 "--limit",
3448 "5",
3449 ])
3450 .unwrap();
3451 match &cli.command {
3452 Commands::Knowledge {
3453 kind,
3454 repo,
3455 ink,
3456 limit,
3457 ..
3458 } => {
3459 assert_eq!(kind.as_deref(), Some("failure"));
3460 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3461 assert_eq!(ink.as_deref(), Some("coder"));
3462 assert_eq!(*limit, 5);
3463 }
3464 _ => panic!("expected Knowledge command"),
3465 }
3466 }
3467
3468 #[test]
3469 fn test_cli_parse_knowledge_context() {
3470 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3471 .unwrap();
3472 match &cli.command {
3473 Commands::Knowledge { context, repo, .. } => {
3474 assert!(*context);
3475 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3476 }
3477 _ => panic!("expected Knowledge command"),
3478 }
3479 }
3480
3481 #[test]
3482 fn test_format_knowledge_empty() {
3483 assert_eq!(format_knowledge(&[]), "No knowledge found.");
3484 }
3485
3486 #[test]
3487 fn test_format_knowledge_items() {
3488 use chrono::Utc;
3489 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3490 use uuid::Uuid;
3491
3492 let items = vec![
3493 Knowledge {
3494 id: Uuid::new_v4(),
3495 session_id: Uuid::new_v4(),
3496 kind: KnowledgeKind::Summary,
3497 scope_repo: Some("/tmp/repo".into()),
3498 scope_ink: Some("coder".into()),
3499 title: "Fixed the auth bug".into(),
3500 body: "Details".into(),
3501 tags: vec!["claude".into(), "completed".into()],
3502 relevance: 0.7,
3503 created_at: Utc::now(),
3504 },
3505 Knowledge {
3506 id: Uuid::new_v4(),
3507 session_id: Uuid::new_v4(),
3508 kind: KnowledgeKind::Failure,
3509 scope_repo: None,
3510 scope_ink: None,
3511 title: "OOM crash during build".into(),
3512 body: "Details".into(),
3513 tags: vec!["failure".into()],
3514 relevance: 0.9,
3515 created_at: Utc::now(),
3516 },
3517 ];
3518
3519 let output = format_knowledge(&items);
3520 assert!(output.contains("KIND"));
3521 assert!(output.contains("TITLE"));
3522 assert!(output.contains("summary"));
3523 assert!(output.contains("failure"));
3524 assert!(output.contains("Fixed the auth bug"));
3525 assert!(output.contains("repo"));
3526 assert!(output.contains("0.70"));
3527 }
3528
3529 #[test]
3530 fn test_format_knowledge_long_title_truncated() {
3531 use chrono::Utc;
3532 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3533 use uuid::Uuid;
3534
3535 let items = vec![Knowledge {
3536 id: Uuid::new_v4(),
3537 session_id: Uuid::new_v4(),
3538 kind: KnowledgeKind::Summary,
3539 scope_repo: Some("/repo".into()),
3540 scope_ink: None,
3541 title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3542 body: "Body".into(),
3543 tags: vec![],
3544 relevance: 0.5,
3545 created_at: Utc::now(),
3546 }];
3547
3548 let output = format_knowledge(&items);
3549 assert!(output.contains('…'));
3550 }
3551
3552 #[test]
3553 fn test_cli_parse_knowledge_get() {
3554 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3555 match &cli.command {
3556 Commands::Knowledge { get, .. } => {
3557 assert_eq!(get.as_deref(), Some("abc-123"));
3558 }
3559 _ => panic!("expected Knowledge command"),
3560 }
3561 }
3562
3563 #[test]
3564 fn test_cli_parse_knowledge_delete() {
3565 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3566 match &cli.command {
3567 Commands::Knowledge { delete, .. } => {
3568 assert_eq!(delete.as_deref(), Some("abc-123"));
3569 }
3570 _ => panic!("expected Knowledge command"),
3571 }
3572 }
3573
3574 #[test]
3575 fn test_cli_parse_knowledge_push() {
3576 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3577 match &cli.command {
3578 Commands::Knowledge { push, .. } => {
3579 assert!(*push);
3580 }
3581 _ => panic!("expected Knowledge command"),
3582 }
3583 }
3584
3585 #[test]
3586 fn test_format_knowledge_no_repo() {
3587 use chrono::Utc;
3588 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3589 use uuid::Uuid;
3590
3591 let items = vec![Knowledge {
3592 id: Uuid::new_v4(),
3593 session_id: Uuid::new_v4(),
3594 kind: KnowledgeKind::Summary,
3595 scope_repo: None,
3596 scope_ink: None,
3597 title: "Global finding".into(),
3598 body: "Body".into(),
3599 tags: vec![],
3600 relevance: 0.5,
3601 created_at: Utc::now(),
3602 }];
3603
3604 let output = format_knowledge(&items);
3605 assert!(output.contains('-')); }
3607}