1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5 KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32 #[command(visible_alias = "a")]
34 Attach {
35 name: String,
37 },
38
39 #[command(visible_alias = "i")]
41 Input {
42 name: String,
44 text: Option<String>,
46 },
47
48 #[command(visible_alias = "s")]
50 Spawn {
51 #[arg(long)]
53 workdir: Option<String>,
54
55 #[arg(long)]
57 name: Option<String>,
58
59 #[arg(long)]
61 provider: Option<String>,
62
63 #[arg(long)]
65 auto: bool,
66
67 #[arg(long)]
69 unrestricted: bool,
70
71 #[arg(long)]
73 model: Option<String>,
74
75 #[arg(long)]
77 system_prompt: Option<String>,
78
79 #[arg(long, value_delimiter = ',')]
81 allowed_tools: Option<Vec<String>>,
82
83 #[arg(long)]
85 ink: Option<String>,
86
87 #[arg(long)]
89 max_turns: Option<u32>,
90
91 #[arg(long)]
93 max_budget: Option<f64>,
94
95 #[arg(long)]
97 output_format: Option<String>,
98
99 #[arg(long)]
101 worktree: bool,
102
103 #[arg(long)]
105 conversation_id: Option<String>,
106
107 prompt: Vec<String>,
109 },
110
111 #[command(visible_alias = "ls")]
113 List,
114
115 #[command(visible_alias = "l")]
117 Logs {
118 name: String,
120
121 #[arg(long, default_value = "100")]
123 lines: usize,
124
125 #[arg(short, long)]
127 follow: bool,
128 },
129
130 #[command(visible_alias = "k")]
132 Kill {
133 name: String,
135 },
136
137 #[command(visible_alias = "rm")]
139 Delete {
140 name: String,
142 },
143
144 #[command(visible_alias = "r")]
146 Resume {
147 name: String,
149 },
150
151 #[command(visible_alias = "n")]
153 Nodes,
154
155 #[command(visible_alias = "iv")]
157 Interventions {
158 name: String,
160 },
161
162 Ui,
164
165 #[command(visible_alias = "kn")]
167 Knowledge {
168 #[arg(long)]
170 session: Option<String>,
171
172 #[arg(long)]
174 kind: Option<String>,
175
176 #[arg(long)]
178 repo: Option<String>,
179
180 #[arg(long)]
182 ink: Option<String>,
183
184 #[arg(long, default_value = "20")]
186 limit: usize,
187
188 #[arg(long)]
190 context: bool,
191
192 #[arg(long)]
194 get: Option<String>,
195
196 #[arg(long)]
198 delete: Option<String>,
199
200 #[arg(long)]
202 push: bool,
203 },
204
205 #[command(visible_alias = "sched")]
207 Schedule {
208 #[command(subcommand)]
209 action: ScheduleAction,
210 },
211}
212
213#[derive(Subcommand, Debug)]
214pub enum ScheduleAction {
215 Install {
217 name: String,
219 cron: String,
221 #[arg(long)]
223 workdir: String,
224 #[arg(long, default_value = "claude")]
226 provider: String,
227 prompt: Vec<String>,
229 },
230 #[command(alias = "ls")]
232 List,
233 #[command(alias = "rm")]
235 Remove {
236 name: String,
238 },
239 Pause {
241 name: String,
243 },
244 Resume {
246 name: String,
248 },
249}
250
251pub fn base_url(node: &str) -> String {
253 format!("http://{node}")
254}
255
256#[derive(serde::Deserialize)]
258struct OutputResponse {
259 output: String,
260}
261
262fn format_sessions(sessions: &[Session]) -> String {
264 if sessions.is_empty() {
265 return "No sessions.".into();
266 }
267 let mut lines = vec![format!(
268 "{:<20} {:<12} {:<10} {:<14} {}",
269 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
270 )];
271 for s in sessions {
272 let prompt_display = if s.prompt.len() > 40 {
273 format!("{}...", &s.prompt[..37])
274 } else {
275 s.prompt.clone()
276 };
277 let status_display = if s.waiting_for_input {
278 "waiting".to_owned()
279 } else {
280 s.status.to_string()
281 };
282 lines.push(format!(
283 "{:<20} {:<12} {:<10} {:<14} {}",
284 s.name, status_display, s.provider, s.mode, prompt_display
285 ));
286 }
287 lines.join("\n")
288}
289
290fn format_nodes(resp: &PeersResponse) -> String {
292 let mut lines = vec![format!(
293 "{:<20} {:<25} {:<10} {}",
294 "NAME", "ADDRESS", "STATUS", "SESSIONS"
295 )];
296 lines.push(format!(
297 "{:<20} {:<25} {:<10} {}",
298 resp.local.name, "(local)", "online", "-"
299 ));
300 for p in &resp.peers {
301 let sessions = p
302 .session_count
303 .map_or_else(|| "-".into(), |c| c.to_string());
304 lines.push(format!(
305 "{:<20} {:<25} {:<10} {}",
306 p.name, p.address, p.status, sessions
307 ));
308 }
309 lines.join("\n")
310}
311
312fn format_interventions(events: &[InterventionEventResponse]) -> String {
314 if events.is_empty() {
315 return "No intervention events.".into();
316 }
317 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
318 for e in events {
319 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
320 }
321 lines.join("\n")
322}
323
324fn format_knowledge(items: &[Knowledge]) -> String {
326 if items.is_empty() {
327 return "No knowledge found.".into();
328 }
329 let mut lines = vec![format!(
330 "{:<10} {:<40} {:<10} {:<6} {}",
331 "KIND", "TITLE", "REPO", "REL", "TAGS"
332 )];
333 for k in items {
334 let title = if k.title.len() > 38 {
335 format!("{}…", &k.title[..37])
336 } else {
337 k.title.clone()
338 };
339 let repo = k
340 .scope_repo
341 .as_deref()
342 .and_then(|r| r.rsplit('/').next())
343 .unwrap_or("-");
344 let tags = k.tags.join(",");
345 lines.push(format!(
346 "{:<10} {:<40} {:<10} {:<6.2} {}",
347 k.kind, title, repo, k.relevance, tags
348 ));
349 }
350 lines.join("\n")
351}
352
353#[cfg_attr(coverage, allow(dead_code))]
355fn build_open_command(url: &str) -> std::process::Command {
356 #[cfg(target_os = "macos")]
357 {
358 let mut cmd = std::process::Command::new("open");
359 cmd.arg(url);
360 cmd
361 }
362 #[cfg(target_os = "linux")]
363 {
364 let mut cmd = std::process::Command::new("xdg-open");
365 cmd.arg(url);
366 cmd
367 }
368 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
369 {
370 let mut cmd = std::process::Command::new("xdg-open");
372 cmd.arg(url);
373 cmd
374 }
375}
376
377#[cfg(not(coverage))]
379fn open_browser(url: &str) -> Result<()> {
380 build_open_command(url).status()?;
381 Ok(())
382}
383
384#[cfg(coverage)]
386fn open_browser(_url: &str) -> Result<()> {
387 Ok(())
388}
389
390#[cfg_attr(coverage, allow(dead_code))]
393fn build_attach_command(backend_session_id: &str) -> std::process::Command {
394 let mut cmd = std::process::Command::new("tmux");
395 cmd.args(["attach-session", "-t", backend_session_id]);
396 cmd
397}
398
399#[cfg(not(any(test, coverage)))]
401fn attach_session(backend_session_id: &str) -> Result<()> {
402 let status = build_attach_command(backend_session_id).status()?;
403 if !status.success() {
404 anyhow::bail!("attach failed with {status}");
405 }
406 Ok(())
407}
408
409#[cfg(any(test, coverage))]
411#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
412fn attach_session(_backend_session_id: &str) -> Result<()> {
413 Ok(())
414}
415
416fn api_error(text: &str) -> anyhow::Error {
418 serde_json::from_str::<serde_json::Value>(text)
419 .ok()
420 .and_then(|v| v["error"].as_str().map(String::from))
421 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
422}
423
424async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
426 if resp.status().is_success() {
427 Ok(resp.text().await?)
428 } else {
429 let text = resp.text().await?;
430 Err(api_error(&text))
431 }
432}
433
434fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
436 if err.is_connect() {
437 anyhow::anyhow!(
438 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
439 )
440 } else {
441 anyhow::anyhow!("Network error connecting to {node}: {err}")
442 }
443}
444
445fn is_localhost(node: &str) -> bool {
447 let host = node.split(':').next().unwrap_or(node);
448 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
449}
450
451async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
453 let resp = client
454 .get(format!("{base}/api/v1/auth/token"))
455 .send()
456 .await
457 .ok()?;
458 let body: AuthTokenResponse = resp.json().await.ok()?;
459 if body.token.is_empty() {
460 None
461 } else {
462 Some(body.token)
463 }
464}
465
466async fn resolve_token(
468 client: &reqwest::Client,
469 base: &str,
470 node: &str,
471 explicit: Option<&str>,
472) -> Option<String> {
473 if let Some(t) = explicit {
474 return Some(t.to_owned());
475 }
476 if is_localhost(node) {
477 return discover_token(client, base).await;
478 }
479 None
480}
481
482fn authed_get(
484 client: &reqwest::Client,
485 url: String,
486 token: Option<&str>,
487) -> reqwest::RequestBuilder {
488 let req = client.get(url);
489 if let Some(t) = token {
490 req.bearer_auth(t)
491 } else {
492 req
493 }
494}
495
496fn authed_post(
498 client: &reqwest::Client,
499 url: String,
500 token: Option<&str>,
501) -> reqwest::RequestBuilder {
502 let req = client.post(url);
503 if let Some(t) = token {
504 req.bearer_auth(t)
505 } else {
506 req
507 }
508}
509
510fn authed_delete(
512 client: &reqwest::Client,
513 url: String,
514 token: Option<&str>,
515) -> reqwest::RequestBuilder {
516 let req = client.delete(url);
517 if let Some(t) = token {
518 req.bearer_auth(t)
519 } else {
520 req
521 }
522}
523
524async fn fetch_output(
526 client: &reqwest::Client,
527 base: &str,
528 name: &str,
529 lines: usize,
530 token: Option<&str>,
531) -> Result<String> {
532 let resp = authed_get(
533 client,
534 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
535 token,
536 )
537 .send()
538 .await?;
539 let text = ok_or_api_error(resp).await?;
540 let output: OutputResponse = serde_json::from_str(&text)?;
541 Ok(output.output)
542}
543
544async fn fetch_session_status(
546 client: &reqwest::Client,
547 base: &str,
548 name: &str,
549 token: Option<&str>,
550) -> Result<String> {
551 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
552 .send()
553 .await?;
554 let text = ok_or_api_error(resp).await?;
555 let session: Session = serde_json::from_str(&text)?;
556 Ok(session.status.to_string())
557}
558
559fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
566 if prev.is_empty() {
567 return new;
568 }
569
570 let prev_lines: Vec<&str> = prev.lines().collect();
571 let new_lines: Vec<&str> = new.lines().collect();
572
573 if new_lines.is_empty() {
574 return "";
575 }
576
577 let last_prev = prev_lines[prev_lines.len() - 1];
579
580 for i in (0..new_lines.len()).rev() {
582 if new_lines[i] == last_prev {
583 let overlap_len = prev_lines.len().min(i + 1);
585 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
586 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
587 if prev_tail == new_overlap {
588 if i + 1 < new_lines.len() {
589 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
591 return new.get(consumed.min(new.len())..).unwrap_or("");
592 }
593 return "";
594 }
595 }
596 }
597
598 new
600}
601
602async fn follow_logs(
604 client: &reqwest::Client,
605 base: &str,
606 name: &str,
607 lines: usize,
608 token: Option<&str>,
609 writer: &mut (dyn std::io::Write + Send),
610) -> Result<()> {
611 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
612 write!(writer, "{prev_output}")?;
613
614 loop {
615 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
616
617 let status = fetch_session_status(client, base, name, token).await?;
619 let is_terminal = status == "completed" || status == "dead" || status == "stale";
620
621 let new_output = fetch_output(client, base, name, lines, token).await?;
623
624 let diff = diff_output(&prev_output, &new_output);
625 if !diff.is_empty() {
626 write!(writer, "{diff}")?;
627 }
628 prev_output = new_output;
629
630 if is_terminal {
631 break;
632 }
633 }
634 Ok(())
635}
636
637#[cfg_attr(coverage, allow(dead_code))]
640const CRONTAB_TAG: &str = "#pulpo:";
641
642#[cfg(not(coverage))]
644fn read_crontab() -> Result<String> {
645 let output = std::process::Command::new("crontab").arg("-l").output()?;
646 if output.status.success() {
647 Ok(String::from_utf8_lossy(&output.stdout).to_string())
648 } else {
649 Ok(String::new())
650 }
651}
652
653#[cfg(not(coverage))]
655fn write_crontab(content: &str) -> Result<()> {
656 use std::io::Write;
657 let mut child = std::process::Command::new("crontab")
658 .arg("-")
659 .stdin(std::process::Stdio::piped())
660 .spawn()?;
661 child
662 .stdin
663 .as_mut()
664 .unwrap()
665 .write_all(content.as_bytes())?;
666 let status = child.wait()?;
667 if !status.success() {
668 anyhow::bail!("crontab write failed");
669 }
670 Ok(())
671}
672
673#[cfg_attr(coverage, allow(dead_code))]
675fn build_crontab_line(
676 name: &str,
677 cron: &str,
678 workdir: &str,
679 provider: &str,
680 prompt: &str,
681 node: &str,
682) -> String {
683 format!(
684 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
685 )
686}
687
688#[cfg_attr(coverage, allow(dead_code))]
690fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
691 let tag = format!("{CRONTAB_TAG}{name}");
692 if crontab.contains(&tag) {
693 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
694 }
695 let mut result = crontab.to_owned();
696 result.push_str(line);
697 Ok(result)
698}
699
700#[cfg_attr(coverage, allow(dead_code))]
702fn crontab_list(crontab: &str) -> String {
703 let entries: Vec<&str> = crontab
704 .lines()
705 .filter(|l| l.contains(CRONTAB_TAG))
706 .collect();
707 if entries.is_empty() {
708 return "No pulpo schedules.".into();
709 }
710 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
711 for entry in entries {
712 let paused = entry.starts_with('#');
713 let raw = entry.trim_start_matches('#').trim();
714 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
715 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
716 let cron_expr = if parts.len() >= 5 {
717 parts[..5].join(" ")
718 } else {
719 "?".into()
720 };
721 lines.push(format!(
722 "{:<20} {:<15} {}",
723 name,
724 cron_expr,
725 if paused { "yes" } else { "no" }
726 ));
727 }
728 lines.join("\n")
729}
730
731#[cfg_attr(coverage, allow(dead_code))]
733fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
734 use std::fmt::Write;
735 let tag = format!("{CRONTAB_TAG}{name}");
736 let filtered =
737 crontab
738 .lines()
739 .filter(|l| !l.contains(&tag))
740 .fold(String::new(), |mut acc, l| {
741 writeln!(acc, "{l}").unwrap();
742 acc
743 });
744 if filtered.len() == crontab.len() {
745 anyhow::bail!("schedule \"{name}\" not found");
746 }
747 Ok(filtered)
748}
749
750#[cfg_attr(coverage, allow(dead_code))]
752fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
753 use std::fmt::Write;
754 let tag = format!("{CRONTAB_TAG}{name}");
755 let mut found = false;
756 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
757 if l.contains(&tag) && !l.starts_with('#') {
758 found = true;
759 writeln!(acc, "#{l}").unwrap();
760 } else {
761 writeln!(acc, "{l}").unwrap();
762 }
763 acc
764 });
765 if !found {
766 anyhow::bail!("schedule \"{name}\" not found or already paused");
767 }
768 Ok(updated)
769}
770
771#[cfg_attr(coverage, allow(dead_code))]
773fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
774 use std::fmt::Write;
775 let tag = format!("{CRONTAB_TAG}{name}");
776 let mut found = false;
777 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
778 if l.contains(&tag) && l.starts_with('#') {
779 found = true;
780 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
781 } else {
782 writeln!(acc, "{l}").unwrap();
783 }
784 acc
785 });
786 if !found {
787 anyhow::bail!("schedule \"{name}\" not found or not paused");
788 }
789 Ok(updated)
790}
791
792#[cfg(not(coverage))]
794fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
795 match action {
796 ScheduleAction::Install {
797 name,
798 cron,
799 workdir,
800 provider,
801 prompt,
802 } => {
803 let crontab = read_crontab()?;
804 let joined_prompt = prompt.join(" ");
805 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
806 let updated = crontab_install(&crontab, name, &line)?;
807 write_crontab(&updated)?;
808 Ok(format!("Installed schedule \"{name}\""))
809 }
810 ScheduleAction::List => {
811 let crontab = read_crontab()?;
812 Ok(crontab_list(&crontab))
813 }
814 ScheduleAction::Remove { name } => {
815 let crontab = read_crontab()?;
816 let updated = crontab_remove(&crontab, name)?;
817 write_crontab(&updated)?;
818 Ok(format!("Removed schedule \"{name}\""))
819 }
820 ScheduleAction::Pause { name } => {
821 let crontab = read_crontab()?;
822 let updated = crontab_pause(&crontab, name)?;
823 write_crontab(&updated)?;
824 Ok(format!("Paused schedule \"{name}\""))
825 }
826 ScheduleAction::Resume { name } => {
827 let crontab = read_crontab()?;
828 let updated = crontab_resume(&crontab, name)?;
829 write_crontab(&updated)?;
830 Ok(format!("Resumed schedule \"{name}\""))
831 }
832 }
833}
834
835#[cfg(coverage)]
837fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
838 Ok(String::new())
839}
840
841#[allow(clippy::too_many_lines)]
843pub async fn execute(cli: &Cli) -> Result<String> {
844 let url = base_url(&cli.node);
845 let client = reqwest::Client::new();
846 let node = &cli.node;
847 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
848
849 match &cli.command {
850 Commands::Attach { name } => {
851 let resp = authed_get(
853 &client,
854 format!("{url}/api/v1/sessions/{name}"),
855 token.as_deref(),
856 )
857 .send()
858 .await
859 .map_err(|e| friendly_error(&e, node))?;
860 let text = ok_or_api_error(resp).await?;
861 let session: Session = serde_json::from_str(&text)?;
862 match session.status.to_string().as_str() {
863 "stale" => {
864 anyhow::bail!(
865 "Session \"{name}\" is stale (agent process died). Resume it first:\n pulpo resume {name}"
866 );
867 }
868 "completed" | "dead" => {
869 anyhow::bail!(
870 "Session \"{name}\" is {} — cannot attach to a finished session.",
871 session.status
872 );
873 }
874 _ => {}
875 }
876 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
877 attach_session(&backend_id)?;
878 Ok(format!("Detached from session {name}."))
879 }
880 Commands::Input { name, text } => {
881 let input_text = text.as_deref().unwrap_or("\n");
882 let body = serde_json::json!({ "text": input_text });
883 let resp = authed_post(
884 &client,
885 format!("{url}/api/v1/sessions/{name}/input"),
886 token.as_deref(),
887 )
888 .json(&body)
889 .send()
890 .await
891 .map_err(|e| friendly_error(&e, node))?;
892 ok_or_api_error(resp).await?;
893 Ok(format!("Sent input to session {name}."))
894 }
895 Commands::List => {
896 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
897 .send()
898 .await
899 .map_err(|e| friendly_error(&e, node))?;
900 let text = ok_or_api_error(resp).await?;
901 let sessions: Vec<Session> = serde_json::from_str(&text)?;
902 Ok(format_sessions(&sessions))
903 }
904 Commands::Nodes => {
905 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
906 .send()
907 .await
908 .map_err(|e| friendly_error(&e, node))?;
909 let text = ok_or_api_error(resp).await?;
910 let resp: PeersResponse = serde_json::from_str(&text)?;
911 Ok(format_nodes(&resp))
912 }
913 Commands::Spawn {
914 workdir,
915 name,
916 provider,
917 auto,
918 unrestricted,
919 model,
920 system_prompt,
921 allowed_tools,
922 ink,
923 max_turns,
924 max_budget,
925 output_format,
926 worktree,
927 conversation_id,
928 prompt,
929 } => {
930 let prompt_text = prompt.join(" ");
931 let mode = if *auto { "autonomous" } else { "interactive" };
932 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
934 std::env::current_dir()
935 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
936 });
937 let mut body = serde_json::json!({
938 "workdir": resolved_workdir,
939 "mode": mode,
940 });
941 if !prompt_text.is_empty() {
943 body["prompt"] = serde_json::json!(prompt_text);
944 }
945 if let Some(p) = provider {
947 body["provider"] = serde_json::json!(p);
948 }
949 if *unrestricted {
950 body["unrestricted"] = serde_json::json!(true);
951 }
952 if let Some(n) = name {
953 body["name"] = serde_json::json!(n);
954 }
955 if let Some(m) = model {
956 body["model"] = serde_json::json!(m);
957 }
958 if let Some(sp) = system_prompt {
959 body["system_prompt"] = serde_json::json!(sp);
960 }
961 if let Some(tools) = allowed_tools {
962 body["allowed_tools"] = serde_json::json!(tools);
963 }
964 if let Some(p) = ink {
965 body["ink"] = serde_json::json!(p);
966 }
967 if let Some(mt) = max_turns {
968 body["max_turns"] = serde_json::json!(mt);
969 }
970 if let Some(mb) = max_budget {
971 body["max_budget_usd"] = serde_json::json!(mb);
972 }
973 if let Some(of) = output_format {
974 body["output_format"] = serde_json::json!(of);
975 }
976 if *worktree {
977 body["worktree"] = serde_json::json!(true);
978 }
979 if let Some(cid) = conversation_id {
980 body["conversation_id"] = serde_json::json!(cid);
981 }
982 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
983 .json(&body)
984 .send()
985 .await
986 .map_err(|e| friendly_error(&e, node))?;
987 let text = ok_or_api_error(resp).await?;
988 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
989 let mut msg = format!(
990 "Created session \"{}\" ({})",
991 resp.session.name, resp.session.id
992 );
993 for w in &resp.warnings {
994 use std::fmt::Write;
995 let _ = write!(msg, "\n Warning: {w}");
996 }
997 Ok(msg)
998 }
999 Commands::Kill { name } => {
1000 let resp = authed_post(
1001 &client,
1002 format!("{url}/api/v1/sessions/{name}/kill"),
1003 token.as_deref(),
1004 )
1005 .send()
1006 .await
1007 .map_err(|e| friendly_error(&e, node))?;
1008 ok_or_api_error(resp).await?;
1009 Ok(format!("Session {name} killed."))
1010 }
1011 Commands::Delete { name } => {
1012 let resp = authed_delete(
1013 &client,
1014 format!("{url}/api/v1/sessions/{name}"),
1015 token.as_deref(),
1016 )
1017 .send()
1018 .await
1019 .map_err(|e| friendly_error(&e, node))?;
1020 ok_or_api_error(resp).await?;
1021 Ok(format!("Session {name} deleted."))
1022 }
1023 Commands::Logs {
1024 name,
1025 lines,
1026 follow,
1027 } => {
1028 if *follow {
1029 let mut stdout = std::io::stdout();
1030 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1031 .await
1032 .map_err(|e| {
1033 match e.downcast::<reqwest::Error>() {
1035 Ok(re) => friendly_error(&re, node),
1036 Err(other) => other,
1037 }
1038 })?;
1039 Ok(String::new())
1040 } else {
1041 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1042 .await
1043 .map_err(|e| match e.downcast::<reqwest::Error>() {
1044 Ok(re) => friendly_error(&re, node),
1045 Err(other) => other,
1046 })?;
1047 Ok(output)
1048 }
1049 }
1050 Commands::Interventions { name } => {
1051 let resp = authed_get(
1052 &client,
1053 format!("{url}/api/v1/sessions/{name}/interventions"),
1054 token.as_deref(),
1055 )
1056 .send()
1057 .await
1058 .map_err(|e| friendly_error(&e, node))?;
1059 let text = ok_or_api_error(resp).await?;
1060 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1061 Ok(format_interventions(&events))
1062 }
1063 Commands::Ui => {
1064 let dashboard = base_url(&cli.node);
1065 open_browser(&dashboard)?;
1066 Ok(format!("Opening {dashboard}"))
1067 }
1068 Commands::Resume { name } => {
1069 let resp = authed_post(
1070 &client,
1071 format!("{url}/api/v1/sessions/{name}/resume"),
1072 token.as_deref(),
1073 )
1074 .send()
1075 .await
1076 .map_err(|e| friendly_error(&e, node))?;
1077 let text = ok_or_api_error(resp).await?;
1078 let session: Session = serde_json::from_str(&text)?;
1079 Ok(format!("Resumed session \"{}\"", session.name))
1080 }
1081 Commands::Knowledge {
1082 session,
1083 kind,
1084 repo,
1085 ink,
1086 limit,
1087 context,
1088 get,
1089 delete,
1090 push,
1091 } => {
1092 if let Some(id) = get {
1094 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1095 let resp = authed_get(&client, endpoint, token.as_deref())
1096 .send()
1097 .await
1098 .map_err(|e| friendly_error(&e, node))?;
1099 let text = ok_or_api_error(resp).await?;
1100 let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1101 return Ok(format_knowledge(&[resp.knowledge]));
1102 }
1103
1104 if let Some(id) = delete {
1106 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1107 let resp = authed_delete(&client, endpoint, token.as_deref())
1108 .send()
1109 .await
1110 .map_err(|e| friendly_error(&e, node))?;
1111 let text = ok_or_api_error(resp).await?;
1112 let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1113 return Ok(if resp.deleted {
1114 format!("Deleted knowledge item {id}")
1115 } else {
1116 format!("Knowledge item {id} not found")
1117 });
1118 }
1119
1120 if *push {
1122 let endpoint = format!("{url}/api/v1/knowledge/push");
1123 let resp = authed_post(&client, endpoint, token.as_deref())
1124 .send()
1125 .await
1126 .map_err(|e| friendly_error(&e, node))?;
1127 let text = ok_or_api_error(resp).await?;
1128 let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1129 return Ok(resp.message);
1130 }
1131
1132 let mut params = vec![format!("limit={limit}")];
1134 let endpoint = if *context {
1135 if let Some(r) = repo {
1136 params.push(format!("workdir={r}"));
1137 }
1138 if let Some(i) = ink {
1139 params.push(format!("ink={i}"));
1140 }
1141 format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1142 } else {
1143 if let Some(s) = session {
1144 params.push(format!("session_id={s}"));
1145 }
1146 if let Some(k) = kind {
1147 params.push(format!("kind={k}"));
1148 }
1149 if let Some(r) = repo {
1150 params.push(format!("repo={r}"));
1151 }
1152 if let Some(i) = ink {
1153 params.push(format!("ink={i}"));
1154 }
1155 format!("{url}/api/v1/knowledge?{}", params.join("&"))
1156 };
1157 let resp = authed_get(&client, endpoint, token.as_deref())
1158 .send()
1159 .await
1160 .map_err(|e| friendly_error(&e, node))?;
1161 let text = ok_or_api_error(resp).await?;
1162 let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1163 Ok(format_knowledge(&resp.knowledge))
1164 }
1165 Commands::Schedule { action } => execute_schedule(action, node),
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172
1173 #[test]
1174 fn test_base_url() {
1175 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1176 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1177 }
1178
1179 #[test]
1180 fn test_cli_parse_list() {
1181 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1182 assert_eq!(cli.node, "localhost:7433");
1183 assert!(matches!(cli.command, Commands::List));
1184 }
1185
1186 #[test]
1187 fn test_cli_parse_nodes() {
1188 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1189 assert!(matches!(cli.command, Commands::Nodes));
1190 }
1191
1192 #[test]
1193 fn test_cli_parse_ui() {
1194 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1195 assert!(matches!(cli.command, Commands::Ui));
1196 }
1197
1198 #[test]
1199 fn test_cli_parse_ui_custom_node() {
1200 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1201 assert!(matches!(cli.command, Commands::Ui));
1202 assert_eq!(cli.node, "mac-mini:7433");
1203 }
1204
1205 #[test]
1206 fn test_build_open_command() {
1207 let cmd = build_open_command("http://localhost:7433");
1208 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1209 assert_eq!(args, vec!["http://localhost:7433"]);
1210 #[cfg(target_os = "macos")]
1211 assert_eq!(cmd.get_program(), "open");
1212 #[cfg(target_os = "linux")]
1213 assert_eq!(cmd.get_program(), "xdg-open");
1214 }
1215
1216 #[test]
1217 fn test_cli_parse_spawn() {
1218 let cli = Cli::try_parse_from([
1219 "pulpo",
1220 "spawn",
1221 "--workdir",
1222 "/tmp/repo",
1223 "Fix",
1224 "the",
1225 "bug",
1226 ])
1227 .unwrap();
1228 assert!(matches!(
1229 &cli.command,
1230 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1231 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1232 && !unrestricted && prompt == &["Fix", "the", "bug"]
1233 ));
1234 }
1235
1236 #[test]
1237 fn test_cli_parse_spawn_with_provider() {
1238 let cli = Cli::try_parse_from([
1239 "pulpo",
1240 "spawn",
1241 "--workdir",
1242 "/tmp",
1243 "--provider",
1244 "codex",
1245 "Do it",
1246 ])
1247 .unwrap();
1248 assert!(matches!(
1249 &cli.command,
1250 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1251 ));
1252 }
1253
1254 #[test]
1255 fn test_cli_parse_spawn_auto() {
1256 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1257 .unwrap();
1258 assert!(matches!(
1259 &cli.command,
1260 Commands::Spawn { auto, .. } if *auto
1261 ));
1262 }
1263
1264 #[test]
1265 fn test_cli_parse_spawn_unrestricted() {
1266 let cli = Cli::try_parse_from([
1267 "pulpo",
1268 "spawn",
1269 "--workdir",
1270 "/tmp",
1271 "--unrestricted",
1272 "Do it",
1273 ])
1274 .unwrap();
1275 assert!(matches!(
1276 &cli.command,
1277 Commands::Spawn { unrestricted, .. } if *unrestricted
1278 ));
1279 }
1280
1281 #[test]
1282 fn test_cli_parse_spawn_unrestricted_default() {
1283 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1284 assert!(matches!(
1285 &cli.command,
1286 Commands::Spawn { unrestricted, .. } if !unrestricted
1287 ));
1288 }
1289
1290 #[test]
1291 fn test_cli_parse_spawn_with_name() {
1292 let cli = Cli::try_parse_from([
1293 "pulpo",
1294 "spawn",
1295 "--workdir",
1296 "/tmp/repo",
1297 "--name",
1298 "my-task",
1299 "Fix it",
1300 ])
1301 .unwrap();
1302 assert!(matches!(
1303 &cli.command,
1304 Commands::Spawn { workdir, name, .. }
1305 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1306 ));
1307 }
1308
1309 #[test]
1310 fn test_cli_parse_spawn_without_name() {
1311 let cli =
1312 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1313 assert!(matches!(
1314 &cli.command,
1315 Commands::Spawn { name, .. } if name.is_none()
1316 ));
1317 }
1318
1319 #[test]
1320 fn test_cli_parse_spawn_with_conversation_id() {
1321 let cli = Cli::try_parse_from([
1322 "pulpo",
1323 "spawn",
1324 "--workdir",
1325 "/tmp/repo",
1326 "--conversation-id",
1327 "conv-abc-123",
1328 "Fix it",
1329 ])
1330 .unwrap();
1331 assert!(matches!(
1332 &cli.command,
1333 Commands::Spawn { conversation_id, .. }
1334 if conversation_id.as_deref() == Some("conv-abc-123")
1335 ));
1336 }
1337
1338 #[test]
1339 fn test_cli_parse_spawn_without_conversation_id() {
1340 let cli =
1341 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1342 assert!(matches!(
1343 &cli.command,
1344 Commands::Spawn { conversation_id, .. } if conversation_id.is_none()
1345 ));
1346 }
1347
1348 #[test]
1349 fn test_cli_parse_logs() {
1350 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1351 assert!(matches!(
1352 &cli.command,
1353 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1354 ));
1355 }
1356
1357 #[test]
1358 fn test_cli_parse_logs_with_lines() {
1359 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1360 assert!(matches!(
1361 &cli.command,
1362 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1363 ));
1364 }
1365
1366 #[test]
1367 fn test_cli_parse_logs_follow() {
1368 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1369 assert!(matches!(
1370 &cli.command,
1371 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1372 ));
1373 }
1374
1375 #[test]
1376 fn test_cli_parse_logs_follow_short() {
1377 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1378 assert!(matches!(
1379 &cli.command,
1380 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1381 ));
1382 }
1383
1384 #[test]
1385 fn test_cli_parse_kill() {
1386 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1387 assert!(matches!(
1388 &cli.command,
1389 Commands::Kill { name } if name == "my-session"
1390 ));
1391 }
1392
1393 #[test]
1394 fn test_cli_parse_delete() {
1395 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1396 assert!(matches!(
1397 &cli.command,
1398 Commands::Delete { name } if name == "my-session"
1399 ));
1400 }
1401
1402 #[test]
1403 fn test_cli_parse_resume() {
1404 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1405 assert!(matches!(
1406 &cli.command,
1407 Commands::Resume { name } if name == "my-session"
1408 ));
1409 }
1410
1411 #[test]
1412 fn test_cli_parse_input() {
1413 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1414 assert!(matches!(
1415 &cli.command,
1416 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1417 ));
1418 }
1419
1420 #[test]
1421 fn test_cli_parse_input_no_text() {
1422 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1423 assert!(matches!(
1424 &cli.command,
1425 Commands::Input { name, text } if name == "my-session" && text.is_none()
1426 ));
1427 }
1428
1429 #[test]
1430 fn test_cli_parse_input_alias() {
1431 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1432 assert!(matches!(
1433 &cli.command,
1434 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1435 ));
1436 }
1437
1438 #[test]
1439 fn test_cli_parse_custom_node() {
1440 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1441 assert_eq!(cli.node, "win-pc:8080");
1442 }
1443
1444 #[test]
1445 fn test_cli_version() {
1446 let result = Cli::try_parse_from(["pulpo", "--version"]);
1447 let err = result.unwrap_err();
1449 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1450 }
1451
1452 #[test]
1453 fn test_cli_parse_no_subcommand_fails() {
1454 let result = Cli::try_parse_from(["pulpo"]);
1455 assert!(result.is_err());
1456 }
1457
1458 #[test]
1459 fn test_cli_debug() {
1460 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1461 let debug = format!("{cli:?}");
1462 assert!(debug.contains("List"));
1463 }
1464
1465 #[test]
1466 fn test_commands_debug() {
1467 let cmd = Commands::List;
1468 assert_eq!(format!("{cmd:?}"), "List");
1469 }
1470
1471 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1473
1474 fn test_create_response_json() -> String {
1476 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1477 }
1478
1479 async fn start_test_server() -> String {
1481 use axum::http::StatusCode;
1482 use axum::{
1483 Json, Router,
1484 routing::{get, post},
1485 };
1486
1487 let create_json = test_create_response_json();
1488
1489 let app = Router::new()
1490 .route(
1491 "/api/v1/sessions",
1492 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1493 (StatusCode::CREATED, create_json.clone())
1494 }),
1495 )
1496 .route(
1497 "/api/v1/sessions/{id}",
1498 get(|| async { TEST_SESSION_JSON.to_owned() })
1499 .delete(|| async { StatusCode::NO_CONTENT }),
1500 )
1501 .route(
1502 "/api/v1/sessions/{id}/kill",
1503 post(|| async { StatusCode::NO_CONTENT }),
1504 )
1505 .route(
1506 "/api/v1/sessions/{id}/output",
1507 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1508 )
1509 .route(
1510 "/api/v1/peers",
1511 get(|| async {
1512 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1513 }),
1514 )
1515 .route(
1516 "/api/v1/sessions/{id}/resume",
1517 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1518 )
1519 .route(
1520 "/api/v1/sessions/{id}/interventions",
1521 get(|| async { "[]".to_owned() }),
1522 )
1523 .route(
1524 "/api/v1/sessions/{id}/input",
1525 post(|| async { StatusCode::NO_CONTENT }),
1526 );
1527
1528 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1529 let addr = listener.local_addr().unwrap();
1530 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1531 format!("127.0.0.1:{}", addr.port())
1532 }
1533
1534 #[tokio::test]
1535 async fn test_execute_list_success() {
1536 let node = start_test_server().await;
1537 let cli = Cli {
1538 node,
1539 token: None,
1540 command: Commands::List,
1541 };
1542 let result = execute(&cli).await.unwrap();
1543 assert_eq!(result, "No sessions.");
1544 }
1545
1546 #[tokio::test]
1547 async fn test_execute_nodes_success() {
1548 let node = start_test_server().await;
1549 let cli = Cli {
1550 node,
1551 token: None,
1552 command: Commands::Nodes,
1553 };
1554 let result = execute(&cli).await.unwrap();
1555 assert!(result.contains("test"));
1556 assert!(result.contains("(local)"));
1557 assert!(result.contains("NAME"));
1558 }
1559
1560 #[tokio::test]
1561 async fn test_execute_spawn_success() {
1562 let node = start_test_server().await;
1563 let cli = Cli {
1564 node,
1565 token: None,
1566 command: Commands::Spawn {
1567 workdir: Some("/tmp/repo".into()),
1568 name: None,
1569 provider: Some("claude".into()),
1570 auto: false,
1571 unrestricted: false,
1572 model: None,
1573 system_prompt: None,
1574 allowed_tools: None,
1575 ink: None,
1576 max_turns: None,
1577 max_budget: None,
1578 output_format: None,
1579 worktree: false,
1580 conversation_id: None,
1581 prompt: vec!["Fix".into(), "bug".into()],
1582 },
1583 };
1584 let result = execute(&cli).await.unwrap();
1585 assert!(result.contains("Created session"));
1586 assert!(result.contains("repo"));
1587 }
1588
1589 #[tokio::test]
1590 async fn test_execute_spawn_with_all_new_flags() {
1591 let node = start_test_server().await;
1592 let cli = Cli {
1593 node,
1594 token: None,
1595 command: Commands::Spawn {
1596 workdir: Some("/tmp/repo".into()),
1597 name: None,
1598 provider: Some("claude".into()),
1599 auto: false,
1600 unrestricted: false,
1601 model: Some("opus".into()),
1602 system_prompt: Some("Be helpful".into()),
1603 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1604 ink: Some("coder".into()),
1605 max_turns: Some(5),
1606 max_budget: Some(2.5),
1607 output_format: Some("json".into()),
1608 worktree: false,
1609 conversation_id: None,
1610 prompt: vec!["Fix".into(), "bug".into()],
1611 },
1612 };
1613 let result = execute(&cli).await.unwrap();
1614 assert!(result.contains("Created session"));
1615 }
1616
1617 #[tokio::test]
1618 async fn test_execute_spawn_auto_mode() {
1619 let node = start_test_server().await;
1620 let cli = Cli {
1621 node,
1622 token: None,
1623 command: Commands::Spawn {
1624 workdir: Some("/tmp/repo".into()),
1625 name: None,
1626 provider: Some("claude".into()),
1627 auto: true,
1628 unrestricted: false,
1629 model: None,
1630 system_prompt: None,
1631 allowed_tools: None,
1632 ink: None,
1633 max_turns: None,
1634 max_budget: None,
1635 output_format: None,
1636 worktree: false,
1637 conversation_id: None,
1638 prompt: vec!["Do it".into()],
1639 },
1640 };
1641 let result = execute(&cli).await.unwrap();
1642 assert!(result.contains("Created session"));
1643 }
1644
1645 #[tokio::test]
1646 async fn test_execute_spawn_with_name() {
1647 let node = start_test_server().await;
1648 let cli = Cli {
1649 node,
1650 token: None,
1651 command: Commands::Spawn {
1652 workdir: Some("/tmp/repo".into()),
1653 name: Some("my-task".into()),
1654 provider: Some("claude".into()),
1655 auto: false,
1656 unrestricted: false,
1657 model: None,
1658 system_prompt: None,
1659 allowed_tools: None,
1660 ink: None,
1661 max_turns: None,
1662 max_budget: None,
1663 output_format: None,
1664 worktree: false,
1665 conversation_id: None,
1666 prompt: vec!["Fix".into(), "bug".into()],
1667 },
1668 };
1669 let result = execute(&cli).await.unwrap();
1670 assert!(result.contains("Created session"));
1671 }
1672
1673 #[tokio::test]
1674 async fn test_execute_kill_success() {
1675 let node = start_test_server().await;
1676 let cli = Cli {
1677 node,
1678 token: None,
1679 command: Commands::Kill {
1680 name: "test-session".into(),
1681 },
1682 };
1683 let result = execute(&cli).await.unwrap();
1684 assert!(result.contains("killed"));
1685 }
1686
1687 #[tokio::test]
1688 async fn test_execute_delete_success() {
1689 let node = start_test_server().await;
1690 let cli = Cli {
1691 node,
1692 token: None,
1693 command: Commands::Delete {
1694 name: "test-session".into(),
1695 },
1696 };
1697 let result = execute(&cli).await.unwrap();
1698 assert!(result.contains("deleted"));
1699 }
1700
1701 #[tokio::test]
1702 async fn test_execute_logs_success() {
1703 let node = start_test_server().await;
1704 let cli = Cli {
1705 node,
1706 token: None,
1707 command: Commands::Logs {
1708 name: "test-session".into(),
1709 lines: 50,
1710 follow: false,
1711 },
1712 };
1713 let result = execute(&cli).await.unwrap();
1714 assert!(result.contains("test output"));
1715 }
1716
1717 #[tokio::test]
1718 async fn test_execute_list_connection_refused() {
1719 let cli = Cli {
1720 node: "localhost:1".into(),
1721 token: None,
1722 command: Commands::List,
1723 };
1724 let result = execute(&cli).await;
1725 let err = result.unwrap_err().to_string();
1726 assert!(
1727 err.contains("Could not connect to pulpod"),
1728 "Expected friendly error, got: {err}"
1729 );
1730 assert!(err.contains("localhost:1"));
1731 }
1732
1733 #[tokio::test]
1734 async fn test_execute_nodes_connection_refused() {
1735 let cli = Cli {
1736 node: "localhost:1".into(),
1737 token: None,
1738 command: Commands::Nodes,
1739 };
1740 let result = execute(&cli).await;
1741 let err = result.unwrap_err().to_string();
1742 assert!(err.contains("Could not connect to pulpod"));
1743 }
1744
1745 #[tokio::test]
1746 async fn test_execute_kill_error_response() {
1747 use axum::{Router, http::StatusCode, routing::post};
1748
1749 let app = Router::new().route(
1750 "/api/v1/sessions/{id}/kill",
1751 post(|| async {
1752 (
1753 StatusCode::NOT_FOUND,
1754 "{\"error\":\"session not found: test-session\"}",
1755 )
1756 }),
1757 );
1758 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1759 let addr = listener.local_addr().unwrap();
1760 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1761 let node = format!("127.0.0.1:{}", addr.port());
1762
1763 let cli = Cli {
1764 node,
1765 token: None,
1766 command: Commands::Kill {
1767 name: "test-session".into(),
1768 },
1769 };
1770 let err = execute(&cli).await.unwrap_err();
1771 assert_eq!(err.to_string(), "session not found: test-session");
1772 }
1773
1774 #[tokio::test]
1775 async fn test_execute_delete_error_response() {
1776 use axum::{Router, http::StatusCode, routing::delete};
1777
1778 let app = Router::new().route(
1779 "/api/v1/sessions/{id}",
1780 delete(|| async {
1781 (
1782 StatusCode::CONFLICT,
1783 "{\"error\":\"cannot delete session in 'running' state\"}",
1784 )
1785 }),
1786 );
1787 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1788 let addr = listener.local_addr().unwrap();
1789 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1790 let node = format!("127.0.0.1:{}", addr.port());
1791
1792 let cli = Cli {
1793 node,
1794 token: None,
1795 command: Commands::Delete {
1796 name: "test-session".into(),
1797 },
1798 };
1799 let err = execute(&cli).await.unwrap_err();
1800 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1801 }
1802
1803 #[tokio::test]
1804 async fn test_execute_logs_error_response() {
1805 use axum::{Router, http::StatusCode, routing::get};
1806
1807 let app = Router::new().route(
1808 "/api/v1/sessions/{id}/output",
1809 get(|| async {
1810 (
1811 StatusCode::NOT_FOUND,
1812 "{\"error\":\"session not found: ghost\"}",
1813 )
1814 }),
1815 );
1816 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1817 let addr = listener.local_addr().unwrap();
1818 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1819 let node = format!("127.0.0.1:{}", addr.port());
1820
1821 let cli = Cli {
1822 node,
1823 token: None,
1824 command: Commands::Logs {
1825 name: "ghost".into(),
1826 lines: 50,
1827 follow: false,
1828 },
1829 };
1830 let err = execute(&cli).await.unwrap_err();
1831 assert_eq!(err.to_string(), "session not found: ghost");
1832 }
1833
1834 #[tokio::test]
1835 async fn test_execute_resume_error_response() {
1836 use axum::{Router, http::StatusCode, routing::post};
1837
1838 let app = Router::new().route(
1839 "/api/v1/sessions/{id}/resume",
1840 post(|| async {
1841 (
1842 StatusCode::BAD_REQUEST,
1843 "{\"error\":\"session is not stale (status: running)\"}",
1844 )
1845 }),
1846 );
1847 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1848 let addr = listener.local_addr().unwrap();
1849 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1850 let node = format!("127.0.0.1:{}", addr.port());
1851
1852 let cli = Cli {
1853 node,
1854 token: None,
1855 command: Commands::Resume {
1856 name: "test-session".into(),
1857 },
1858 };
1859 let err = execute(&cli).await.unwrap_err();
1860 assert_eq!(err.to_string(), "session is not stale (status: running)");
1861 }
1862
1863 #[tokio::test]
1864 async fn test_execute_spawn_error_response() {
1865 use axum::{Router, http::StatusCode, routing::post};
1866
1867 let app = Router::new().route(
1868 "/api/v1/sessions",
1869 post(|| async {
1870 (
1871 StatusCode::INTERNAL_SERVER_ERROR,
1872 "{\"error\":\"failed to spawn session\"}",
1873 )
1874 }),
1875 );
1876 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1877 let addr = listener.local_addr().unwrap();
1878 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1879 let node = format!("127.0.0.1:{}", addr.port());
1880
1881 let cli = Cli {
1882 node,
1883 token: None,
1884 command: Commands::Spawn {
1885 workdir: Some("/tmp/repo".into()),
1886 name: None,
1887 provider: Some("claude".into()),
1888 auto: false,
1889 unrestricted: false,
1890 model: None,
1891 system_prompt: None,
1892 allowed_tools: None,
1893 ink: None,
1894 max_turns: None,
1895 max_budget: None,
1896 output_format: None,
1897 worktree: false,
1898 conversation_id: None,
1899 prompt: vec!["test".into()],
1900 },
1901 };
1902 let err = execute(&cli).await.unwrap_err();
1903 assert_eq!(err.to_string(), "failed to spawn session");
1904 }
1905
1906 #[tokio::test]
1907 async fn test_execute_interventions_error_response() {
1908 use axum::{Router, http::StatusCode, routing::get};
1909
1910 let app = Router::new().route(
1911 "/api/v1/sessions/{id}/interventions",
1912 get(|| async {
1913 (
1914 StatusCode::NOT_FOUND,
1915 "{\"error\":\"session not found: ghost\"}",
1916 )
1917 }),
1918 );
1919 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1920 let addr = listener.local_addr().unwrap();
1921 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1922 let node = format!("127.0.0.1:{}", addr.port());
1923
1924 let cli = Cli {
1925 node,
1926 token: None,
1927 command: Commands::Interventions {
1928 name: "ghost".into(),
1929 },
1930 };
1931 let err = execute(&cli).await.unwrap_err();
1932 assert_eq!(err.to_string(), "session not found: ghost");
1933 }
1934
1935 #[tokio::test]
1936 async fn test_execute_resume_success() {
1937 let node = start_test_server().await;
1938 let cli = Cli {
1939 node,
1940 token: None,
1941 command: Commands::Resume {
1942 name: "test-session".into(),
1943 },
1944 };
1945 let result = execute(&cli).await.unwrap();
1946 assert!(result.contains("Resumed session"));
1947 assert!(result.contains("repo"));
1948 }
1949
1950 #[tokio::test]
1951 async fn test_execute_input_success() {
1952 let node = start_test_server().await;
1953 let cli = Cli {
1954 node,
1955 token: None,
1956 command: Commands::Input {
1957 name: "test-session".into(),
1958 text: Some("yes".into()),
1959 },
1960 };
1961 let result = execute(&cli).await.unwrap();
1962 assert!(result.contains("Sent input to session test-session"));
1963 }
1964
1965 #[tokio::test]
1966 async fn test_execute_input_no_text() {
1967 let node = start_test_server().await;
1968 let cli = Cli {
1969 node,
1970 token: None,
1971 command: Commands::Input {
1972 name: "test-session".into(),
1973 text: None,
1974 },
1975 };
1976 let result = execute(&cli).await.unwrap();
1977 assert!(result.contains("Sent input to session test-session"));
1978 }
1979
1980 #[tokio::test]
1981 async fn test_execute_input_connection_refused() {
1982 let cli = Cli {
1983 node: "localhost:1".into(),
1984 token: None,
1985 command: Commands::Input {
1986 name: "test".into(),
1987 text: Some("y".into()),
1988 },
1989 };
1990 let result = execute(&cli).await;
1991 let err = result.unwrap_err().to_string();
1992 assert!(err.contains("Could not connect to pulpod"));
1993 }
1994
1995 #[tokio::test]
1996 async fn test_execute_input_error_response() {
1997 use axum::{Router, http::StatusCode, routing::post};
1998
1999 let app = Router::new().route(
2000 "/api/v1/sessions/{id}/input",
2001 post(|| async {
2002 (
2003 StatusCode::NOT_FOUND,
2004 "{\"error\":\"session not found: ghost\"}",
2005 )
2006 }),
2007 );
2008 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2009 let addr = listener.local_addr().unwrap();
2010 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2011 let node = format!("127.0.0.1:{}", addr.port());
2012
2013 let cli = Cli {
2014 node,
2015 token: None,
2016 command: Commands::Input {
2017 name: "ghost".into(),
2018 text: Some("y".into()),
2019 },
2020 };
2021 let err = execute(&cli).await.unwrap_err();
2022 assert_eq!(err.to_string(), "session not found: ghost");
2023 }
2024
2025 #[tokio::test]
2026 async fn test_execute_ui() {
2027 let cli = Cli {
2028 node: "localhost:7433".into(),
2029 token: None,
2030 command: Commands::Ui,
2031 };
2032 let result = execute(&cli).await.unwrap();
2033 assert!(result.contains("Opening"));
2034 assert!(result.contains("http://localhost:7433"));
2035 }
2036
2037 #[tokio::test]
2038 async fn test_execute_ui_custom_node() {
2039 let cli = Cli {
2040 node: "mac-mini:7433".into(),
2041 token: None,
2042 command: Commands::Ui,
2043 };
2044 let result = execute(&cli).await.unwrap();
2045 assert!(result.contains("http://mac-mini:7433"));
2046 }
2047
2048 #[test]
2049 fn test_format_sessions_empty() {
2050 assert_eq!(format_sessions(&[]), "No sessions.");
2051 }
2052
2053 #[test]
2054 fn test_format_sessions_with_data() {
2055 use chrono::Utc;
2056 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2057 use uuid::Uuid;
2058
2059 let sessions = vec![Session {
2060 id: Uuid::nil(),
2061 name: "my-api".into(),
2062 workdir: "/tmp/repo".into(),
2063 provider: Provider::Claude,
2064 prompt: "Fix the bug".into(),
2065 status: SessionStatus::Running,
2066 mode: SessionMode::Interactive,
2067 conversation_id: None,
2068 exit_code: None,
2069 backend_session_id: None,
2070 output_snapshot: None,
2071 guard_config: None,
2072 model: None,
2073 allowed_tools: None,
2074 system_prompt: None,
2075 metadata: None,
2076 ink: None,
2077 max_turns: None,
2078 max_budget_usd: None,
2079 output_format: None,
2080 intervention_reason: None,
2081 intervention_at: None,
2082 last_output_at: None,
2083 idle_since: None,
2084 waiting_for_input: false,
2085 created_at: Utc::now(),
2086 updated_at: Utc::now(),
2087 }];
2088 let output = format_sessions(&sessions);
2089 assert!(output.contains("NAME"));
2090 assert!(output.contains("my-api"));
2091 assert!(output.contains("running"));
2092 assert!(output.contains("claude"));
2093 assert!(output.contains("Fix the bug"));
2094 }
2095
2096 #[test]
2097 fn test_format_sessions_long_prompt_truncated() {
2098 use chrono::Utc;
2099 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2100 use uuid::Uuid;
2101
2102 let sessions = vec![Session {
2103 id: Uuid::nil(),
2104 name: "test".into(),
2105 workdir: "/tmp".into(),
2106 provider: Provider::Codex,
2107 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2108 status: SessionStatus::Completed,
2109 mode: SessionMode::Autonomous,
2110 conversation_id: None,
2111 exit_code: None,
2112 backend_session_id: None,
2113 output_snapshot: None,
2114 guard_config: None,
2115 model: None,
2116 allowed_tools: None,
2117 system_prompt: None,
2118 metadata: None,
2119 ink: None,
2120 max_turns: None,
2121 max_budget_usd: None,
2122 output_format: None,
2123 intervention_reason: None,
2124 intervention_at: None,
2125 last_output_at: None,
2126 idle_since: None,
2127 waiting_for_input: false,
2128 created_at: Utc::now(),
2129 updated_at: Utc::now(),
2130 }];
2131 let output = format_sessions(&sessions);
2132 assert!(output.contains("..."));
2133 }
2134
2135 #[test]
2136 fn test_format_sessions_waiting_for_input() {
2137 use chrono::Utc;
2138 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2139 use uuid::Uuid;
2140
2141 let sessions = vec![Session {
2142 id: Uuid::nil(),
2143 name: "blocked".into(),
2144 workdir: "/tmp".into(),
2145 provider: Provider::Claude,
2146 prompt: "Fix bug".into(),
2147 status: SessionStatus::Running,
2148 mode: SessionMode::Interactive,
2149 conversation_id: None,
2150 exit_code: None,
2151 backend_session_id: None,
2152 output_snapshot: None,
2153 guard_config: None,
2154 model: None,
2155 allowed_tools: None,
2156 system_prompt: None,
2157 metadata: None,
2158 ink: None,
2159 max_turns: None,
2160 max_budget_usd: None,
2161 output_format: None,
2162 intervention_reason: None,
2163 intervention_at: None,
2164 last_output_at: None,
2165 idle_since: None,
2166 waiting_for_input: true,
2167 created_at: Utc::now(),
2168 updated_at: Utc::now(),
2169 }];
2170 let output = format_sessions(&sessions);
2171 assert!(output.contains("waiting"));
2172 assert!(!output.contains("running"));
2173 }
2174
2175 #[test]
2176 fn test_format_nodes() {
2177 use pulpo_common::node::NodeInfo;
2178 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2179
2180 let resp = PeersResponse {
2181 local: NodeInfo {
2182 name: "mac-mini".into(),
2183 hostname: "h".into(),
2184 os: "macos".into(),
2185 arch: "arm64".into(),
2186 cpus: 8,
2187 memory_mb: 16384,
2188 gpu: None,
2189 },
2190 peers: vec![PeerInfo {
2191 name: "win-pc".into(),
2192 address: "win-pc:7433".into(),
2193 status: PeerStatus::Online,
2194 node_info: None,
2195 session_count: Some(3),
2196 source: PeerSource::Configured,
2197 }],
2198 };
2199 let output = format_nodes(&resp);
2200 assert!(output.contains("mac-mini"));
2201 assert!(output.contains("(local)"));
2202 assert!(output.contains("win-pc"));
2203 assert!(output.contains('3'));
2204 }
2205
2206 #[test]
2207 fn test_format_nodes_no_session_count() {
2208 use pulpo_common::node::NodeInfo;
2209 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2210
2211 let resp = PeersResponse {
2212 local: NodeInfo {
2213 name: "local".into(),
2214 hostname: "h".into(),
2215 os: "linux".into(),
2216 arch: "x86_64".into(),
2217 cpus: 4,
2218 memory_mb: 8192,
2219 gpu: None,
2220 },
2221 peers: vec![PeerInfo {
2222 name: "peer".into(),
2223 address: "peer:7433".into(),
2224 status: PeerStatus::Offline,
2225 node_info: None,
2226 session_count: None,
2227 source: PeerSource::Configured,
2228 }],
2229 };
2230 let output = format_nodes(&resp);
2231 assert!(output.contains("offline"));
2232 let lines: Vec<&str> = output.lines().collect();
2234 assert!(lines[2].contains('-'));
2235 }
2236
2237 #[tokio::test]
2238 async fn test_execute_resume_connection_refused() {
2239 let cli = Cli {
2240 node: "localhost:1".into(),
2241 token: None,
2242 command: Commands::Resume {
2243 name: "test".into(),
2244 },
2245 };
2246 let result = execute(&cli).await;
2247 let err = result.unwrap_err().to_string();
2248 assert!(err.contains("Could not connect to pulpod"));
2249 }
2250
2251 #[tokio::test]
2252 async fn test_execute_spawn_connection_refused() {
2253 let cli = Cli {
2254 node: "localhost:1".into(),
2255 token: None,
2256 command: Commands::Spawn {
2257 workdir: Some("/tmp".into()),
2258 name: None,
2259 provider: Some("claude".into()),
2260 auto: false,
2261 unrestricted: false,
2262 model: None,
2263 system_prompt: None,
2264 allowed_tools: None,
2265 ink: None,
2266 max_turns: None,
2267 max_budget: None,
2268 output_format: None,
2269 worktree: false,
2270 conversation_id: None,
2271 prompt: vec!["test".into()],
2272 },
2273 };
2274 let result = execute(&cli).await;
2275 let err = result.unwrap_err().to_string();
2276 assert!(err.contains("Could not connect to pulpod"));
2277 }
2278
2279 #[tokio::test]
2280 async fn test_execute_kill_connection_refused() {
2281 let cli = Cli {
2282 node: "localhost:1".into(),
2283 token: None,
2284 command: Commands::Kill {
2285 name: "test".into(),
2286 },
2287 };
2288 let result = execute(&cli).await;
2289 let err = result.unwrap_err().to_string();
2290 assert!(err.contains("Could not connect to pulpod"));
2291 }
2292
2293 #[tokio::test]
2294 async fn test_execute_delete_connection_refused() {
2295 let cli = Cli {
2296 node: "localhost:1".into(),
2297 token: None,
2298 command: Commands::Delete {
2299 name: "test".into(),
2300 },
2301 };
2302 let result = execute(&cli).await;
2303 let err = result.unwrap_err().to_string();
2304 assert!(err.contains("Could not connect to pulpod"));
2305 }
2306
2307 #[tokio::test]
2308 async fn test_execute_logs_connection_refused() {
2309 let cli = Cli {
2310 node: "localhost:1".into(),
2311 token: None,
2312 command: Commands::Logs {
2313 name: "test".into(),
2314 lines: 50,
2315 follow: false,
2316 },
2317 };
2318 let result = execute(&cli).await;
2319 let err = result.unwrap_err().to_string();
2320 assert!(err.contains("Could not connect to pulpod"));
2321 }
2322
2323 #[tokio::test]
2324 async fn test_friendly_error_connect() {
2325 let err = reqwest::Client::new()
2327 .get("http://127.0.0.1:1")
2328 .send()
2329 .await
2330 .unwrap_err();
2331 let friendly = friendly_error(&err, "test-node:1");
2332 let msg = friendly.to_string();
2333 assert!(
2334 msg.contains("Could not connect"),
2335 "Expected connect message, got: {msg}"
2336 );
2337 }
2338
2339 #[tokio::test]
2340 async fn test_friendly_error_other() {
2341 let err = reqwest::Client::new()
2343 .get("http://[::invalid::url")
2344 .send()
2345 .await
2346 .unwrap_err();
2347 let friendly = friendly_error(&err, "bad-host");
2348 let msg = friendly.to_string();
2349 assert!(
2350 msg.contains("Network error"),
2351 "Expected network error message, got: {msg}"
2352 );
2353 assert!(msg.contains("bad-host"));
2354 }
2355
2356 #[test]
2359 fn test_is_localhost_variants() {
2360 assert!(is_localhost("localhost:7433"));
2361 assert!(is_localhost("127.0.0.1:7433"));
2362 assert!(is_localhost("[::1]:7433"));
2363 assert!(is_localhost("::1"));
2364 assert!(is_localhost("localhost"));
2365 assert!(!is_localhost("mac-mini:7433"));
2366 assert!(!is_localhost("192.168.1.100:7433"));
2367 }
2368
2369 #[test]
2370 fn test_authed_get_with_token() {
2371 let client = reqwest::Client::new();
2372 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2373 .build()
2374 .unwrap();
2375 let auth = req
2376 .headers()
2377 .get("authorization")
2378 .unwrap()
2379 .to_str()
2380 .unwrap();
2381 assert_eq!(auth, "Bearer tok");
2382 }
2383
2384 #[test]
2385 fn test_authed_get_without_token() {
2386 let client = reqwest::Client::new();
2387 let req = authed_get(&client, "http://h:1/api".into(), None)
2388 .build()
2389 .unwrap();
2390 assert!(req.headers().get("authorization").is_none());
2391 }
2392
2393 #[test]
2394 fn test_authed_post_with_token() {
2395 let client = reqwest::Client::new();
2396 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2397 .build()
2398 .unwrap();
2399 let auth = req
2400 .headers()
2401 .get("authorization")
2402 .unwrap()
2403 .to_str()
2404 .unwrap();
2405 assert_eq!(auth, "Bearer secret");
2406 }
2407
2408 #[test]
2409 fn test_authed_post_without_token() {
2410 let client = reqwest::Client::new();
2411 let req = authed_post(&client, "http://h:1/api".into(), None)
2412 .build()
2413 .unwrap();
2414 assert!(req.headers().get("authorization").is_none());
2415 }
2416
2417 #[test]
2418 fn test_authed_delete_with_token() {
2419 let client = reqwest::Client::new();
2420 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2421 .build()
2422 .unwrap();
2423 let auth = req
2424 .headers()
2425 .get("authorization")
2426 .unwrap()
2427 .to_str()
2428 .unwrap();
2429 assert_eq!(auth, "Bearer del-tok");
2430 }
2431
2432 #[test]
2433 fn test_authed_delete_without_token() {
2434 let client = reqwest::Client::new();
2435 let req = authed_delete(&client, "http://h:1/api".into(), None)
2436 .build()
2437 .unwrap();
2438 assert!(req.headers().get("authorization").is_none());
2439 }
2440
2441 #[tokio::test]
2442 async fn test_resolve_token_explicit() {
2443 let client = reqwest::Client::new();
2444 let token =
2445 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2446 assert_eq!(token, Some("my-tok".into()));
2447 }
2448
2449 #[tokio::test]
2450 async fn test_resolve_token_remote_no_explicit() {
2451 let client = reqwest::Client::new();
2452 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2453 assert_eq!(token, None);
2454 }
2455
2456 #[tokio::test]
2457 async fn test_resolve_token_localhost_auto_discover() {
2458 use axum::{Json, Router, routing::get};
2459
2460 let app = Router::new().route(
2461 "/api/v1/auth/token",
2462 get(|| async {
2463 Json(AuthTokenResponse {
2464 token: "discovered".into(),
2465 })
2466 }),
2467 );
2468 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2469 let addr = listener.local_addr().unwrap();
2470 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2471
2472 let node = format!("localhost:{}", addr.port());
2473 let base = base_url(&node);
2474 let client = reqwest::Client::new();
2475 let token = resolve_token(&client, &base, &node, None).await;
2476 assert_eq!(token, Some("discovered".into()));
2477 }
2478
2479 #[tokio::test]
2480 async fn test_discover_token_empty_returns_none() {
2481 use axum::{Json, Router, routing::get};
2482
2483 let app = Router::new().route(
2484 "/api/v1/auth/token",
2485 get(|| async {
2486 Json(AuthTokenResponse {
2487 token: String::new(),
2488 })
2489 }),
2490 );
2491 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2492 let addr = listener.local_addr().unwrap();
2493 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2494
2495 let base = format!("http://127.0.0.1:{}", addr.port());
2496 let client = reqwest::Client::new();
2497 assert_eq!(discover_token(&client, &base).await, None);
2498 }
2499
2500 #[tokio::test]
2501 async fn test_discover_token_unreachable_returns_none() {
2502 let client = reqwest::Client::new();
2503 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2504 }
2505
2506 #[test]
2507 fn test_cli_parse_with_token() {
2508 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2509 assert_eq!(cli.token, Some("my-secret".into()));
2510 }
2511
2512 #[test]
2513 fn test_cli_parse_without_token() {
2514 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2515 assert_eq!(cli.token, None);
2516 }
2517
2518 #[tokio::test]
2519 async fn test_execute_with_explicit_token_sends_header() {
2520 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2521
2522 let app = Router::new().route(
2523 "/api/v1/sessions",
2524 get(|req: Request| async move {
2525 let auth = req
2526 .headers()
2527 .get("authorization")
2528 .and_then(|v| v.to_str().ok())
2529 .unwrap_or("");
2530 assert_eq!(auth, "Bearer test-token");
2531 (StatusCode::OK, "[]".to_owned())
2532 }),
2533 );
2534 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2535 let addr = listener.local_addr().unwrap();
2536 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2537 let node = format!("127.0.0.1:{}", addr.port());
2538
2539 let cli = Cli {
2540 node,
2541 token: Some("test-token".into()),
2542 command: Commands::List,
2543 };
2544 let result = execute(&cli).await.unwrap();
2545 assert_eq!(result, "No sessions.");
2546 }
2547
2548 #[test]
2551 fn test_cli_parse_interventions() {
2552 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2553 assert!(matches!(
2554 &cli.command,
2555 Commands::Interventions { name } if name == "my-session"
2556 ));
2557 }
2558
2559 #[test]
2560 fn test_format_interventions_empty() {
2561 assert_eq!(format_interventions(&[]), "No intervention events.");
2562 }
2563
2564 #[test]
2565 fn test_format_interventions_with_data() {
2566 let events = vec![
2567 InterventionEventResponse {
2568 id: 1,
2569 session_id: "sess-1".into(),
2570 reason: "Memory exceeded threshold".into(),
2571 created_at: "2026-01-01T00:00:00Z".into(),
2572 },
2573 InterventionEventResponse {
2574 id: 2,
2575 session_id: "sess-1".into(),
2576 reason: "Idle for 10 minutes".into(),
2577 created_at: "2026-01-02T00:00:00Z".into(),
2578 },
2579 ];
2580 let output = format_interventions(&events);
2581 assert!(output.contains("ID"));
2582 assert!(output.contains("TIMESTAMP"));
2583 assert!(output.contains("REASON"));
2584 assert!(output.contains("Memory exceeded threshold"));
2585 assert!(output.contains("Idle for 10 minutes"));
2586 assert!(output.contains("2026-01-01T00:00:00Z"));
2587 }
2588
2589 #[tokio::test]
2590 async fn test_execute_interventions_empty() {
2591 let node = start_test_server().await;
2592 let cli = Cli {
2593 node,
2594 token: None,
2595 command: Commands::Interventions {
2596 name: "my-session".into(),
2597 },
2598 };
2599 let result = execute(&cli).await.unwrap();
2600 assert_eq!(result, "No intervention events.");
2601 }
2602
2603 #[tokio::test]
2604 async fn test_execute_interventions_with_data() {
2605 use axum::{Router, routing::get};
2606
2607 let app = Router::new().route(
2608 "/api/v1/sessions/{id}/interventions",
2609 get(|| async {
2610 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2611 .to_owned()
2612 }),
2613 );
2614 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2615 let addr = listener.local_addr().unwrap();
2616 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2617 let node = format!("127.0.0.1:{}", addr.port());
2618
2619 let cli = Cli {
2620 node,
2621 token: None,
2622 command: Commands::Interventions {
2623 name: "test".into(),
2624 },
2625 };
2626 let result = execute(&cli).await.unwrap();
2627 assert!(result.contains("OOM"));
2628 assert!(result.contains("2026-01-01T00:00:00Z"));
2629 }
2630
2631 #[tokio::test]
2632 async fn test_execute_interventions_connection_refused() {
2633 let cli = Cli {
2634 node: "localhost:1".into(),
2635 token: None,
2636 command: Commands::Interventions {
2637 name: "test".into(),
2638 },
2639 };
2640 let result = execute(&cli).await;
2641 let err = result.unwrap_err().to_string();
2642 assert!(err.contains("Could not connect to pulpod"));
2643 }
2644
2645 #[test]
2648 fn test_build_attach_command() {
2649 let cmd = build_attach_command("my-session");
2650 assert_eq!(cmd.get_program(), "tmux");
2651 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2652 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2653 }
2654
2655 #[test]
2656 fn test_cli_parse_attach() {
2657 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2658 assert!(matches!(
2659 &cli.command,
2660 Commands::Attach { name } if name == "my-session"
2661 ));
2662 }
2663
2664 #[test]
2665 fn test_cli_parse_attach_alias() {
2666 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2667 assert!(matches!(
2668 &cli.command,
2669 Commands::Attach { name } if name == "my-session"
2670 ));
2671 }
2672
2673 #[tokio::test]
2674 async fn test_execute_attach_success() {
2675 let node = start_test_server().await;
2676 let cli = Cli {
2677 node,
2678 token: None,
2679 command: Commands::Attach {
2680 name: "test-session".into(),
2681 },
2682 };
2683 let result = execute(&cli).await.unwrap();
2684 assert!(result.contains("Detached from session test-session"));
2685 }
2686
2687 #[tokio::test]
2688 async fn test_execute_attach_with_backend_session_id() {
2689 use axum::{Router, routing::get};
2690 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2691 let app = Router::new().route(
2692 "/api/v1/sessions/{id}",
2693 get(move || async move { session_json.to_owned() }),
2694 );
2695 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2696 let addr = listener.local_addr().unwrap();
2697 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2698
2699 let cli = Cli {
2700 node: format!("127.0.0.1:{}", addr.port()),
2701 token: None,
2702 command: Commands::Attach {
2703 name: "my-session".into(),
2704 },
2705 };
2706 let result = execute(&cli).await.unwrap();
2707 assert!(result.contains("Detached from session my-session"));
2708 }
2709
2710 #[tokio::test]
2711 async fn test_execute_attach_connection_refused() {
2712 let cli = Cli {
2713 node: "localhost:1".into(),
2714 token: None,
2715 command: Commands::Attach {
2716 name: "test-session".into(),
2717 },
2718 };
2719 let result = execute(&cli).await;
2720 let err = result.unwrap_err().to_string();
2721 assert!(err.contains("Could not connect to pulpod"));
2722 }
2723
2724 #[tokio::test]
2725 async fn test_execute_attach_error_response() {
2726 use axum::{Router, http::StatusCode, routing::get};
2727 let app = Router::new().route(
2728 "/api/v1/sessions/{id}",
2729 get(|| async {
2730 (
2731 StatusCode::NOT_FOUND,
2732 r#"{"error":"session not found"}"#.to_owned(),
2733 )
2734 }),
2735 );
2736 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2737 let addr = listener.local_addr().unwrap();
2738 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2739
2740 let cli = Cli {
2741 node: format!("127.0.0.1:{}", addr.port()),
2742 token: None,
2743 command: Commands::Attach {
2744 name: "nonexistent".into(),
2745 },
2746 };
2747 let result = execute(&cli).await;
2748 let err = result.unwrap_err().to_string();
2749 assert!(err.contains("session not found"));
2750 }
2751
2752 #[tokio::test]
2753 async fn test_execute_attach_stale_session() {
2754 use axum::{Router, routing::get};
2755 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"stale-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"stale-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2756 let app = Router::new().route(
2757 "/api/v1/sessions/{id}",
2758 get(move || async move { session_json.to_owned() }),
2759 );
2760 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2761 let addr = listener.local_addr().unwrap();
2762 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2763
2764 let cli = Cli {
2765 node: format!("127.0.0.1:{}", addr.port()),
2766 token: None,
2767 command: Commands::Attach {
2768 name: "stale-sess".into(),
2769 },
2770 };
2771 let result = execute(&cli).await;
2772 let err = result.unwrap_err().to_string();
2773 assert!(err.contains("stale"));
2774 assert!(err.contains("pulpo resume"));
2775 }
2776
2777 #[tokio::test]
2778 async fn test_execute_attach_dead_session() {
2779 use axum::{Router, routing::get};
2780 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"dead-sess","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"dead-sess","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2781 let app = Router::new().route(
2782 "/api/v1/sessions/{id}",
2783 get(move || async move { session_json.to_owned() }),
2784 );
2785 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2786 let addr = listener.local_addr().unwrap();
2787 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2788
2789 let cli = Cli {
2790 node: format!("127.0.0.1:{}", addr.port()),
2791 token: None,
2792 command: Commands::Attach {
2793 name: "dead-sess".into(),
2794 },
2795 };
2796 let result = execute(&cli).await;
2797 let err = result.unwrap_err().to_string();
2798 assert!(err.contains("dead"));
2799 assert!(err.contains("cannot attach"));
2800 }
2801
2802 #[test]
2805 fn test_cli_parse_alias_spawn() {
2806 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2807 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2808 }
2809
2810 #[test]
2811 fn test_cli_parse_alias_list() {
2812 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2813 assert!(matches!(cli.command, Commands::List));
2814 }
2815
2816 #[test]
2817 fn test_cli_parse_alias_logs() {
2818 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2819 assert!(matches!(
2820 &cli.command,
2821 Commands::Logs { name, .. } if name == "my-session"
2822 ));
2823 }
2824
2825 #[test]
2826 fn test_cli_parse_alias_kill() {
2827 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2828 assert!(matches!(
2829 &cli.command,
2830 Commands::Kill { name } if name == "my-session"
2831 ));
2832 }
2833
2834 #[test]
2835 fn test_cli_parse_alias_delete() {
2836 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2837 assert!(matches!(
2838 &cli.command,
2839 Commands::Delete { name } if name == "my-session"
2840 ));
2841 }
2842
2843 #[test]
2844 fn test_cli_parse_alias_resume() {
2845 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2846 assert!(matches!(
2847 &cli.command,
2848 Commands::Resume { name } if name == "my-session"
2849 ));
2850 }
2851
2852 #[test]
2853 fn test_cli_parse_alias_nodes() {
2854 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2855 assert!(matches!(cli.command, Commands::Nodes));
2856 }
2857
2858 #[test]
2859 fn test_cli_parse_alias_interventions() {
2860 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2861 assert!(matches!(
2862 &cli.command,
2863 Commands::Interventions { name } if name == "my-session"
2864 ));
2865 }
2866
2867 #[test]
2868 fn test_api_error_json() {
2869 let err = api_error("{\"error\":\"session not found: foo\"}");
2870 assert_eq!(err.to_string(), "session not found: foo");
2871 }
2872
2873 #[test]
2874 fn test_api_error_plain_text() {
2875 let err = api_error("plain text error");
2876 assert_eq!(err.to_string(), "plain text error");
2877 }
2878
2879 #[test]
2882 fn test_diff_output_empty_prev() {
2883 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2884 }
2885
2886 #[test]
2887 fn test_diff_output_identical() {
2888 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2889 }
2890
2891 #[test]
2892 fn test_diff_output_new_lines_appended() {
2893 let prev = "line1\nline2";
2894 let new = "line1\nline2\nline3\nline4";
2895 assert_eq!(diff_output(prev, new), "line3\nline4");
2896 }
2897
2898 #[test]
2899 fn test_diff_output_scrolled_window() {
2900 let prev = "line1\nline2\nline3";
2902 let new = "line2\nline3\nline4";
2903 assert_eq!(diff_output(prev, new), "line4");
2904 }
2905
2906 #[test]
2907 fn test_diff_output_completely_different() {
2908 let prev = "aaa\nbbb";
2909 let new = "xxx\nyyy";
2910 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2911 }
2912
2913 #[test]
2914 fn test_diff_output_last_line_matches_but_overlap_fails() {
2915 let prev = "aaa\ncommon";
2917 let new = "zzz\ncommon\nnew_line";
2918 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2922 }
2923
2924 #[test]
2925 fn test_diff_output_new_empty() {
2926 assert_eq!(diff_output("line1", ""), "");
2927 }
2928
2929 async fn start_follow_test_server() -> String {
2933 use axum::{Router, extract::Path, extract::Query, routing::get};
2934 use std::sync::Arc;
2935 use std::sync::atomic::{AtomicUsize, Ordering};
2936
2937 let call_count = Arc::new(AtomicUsize::new(0));
2938 let output_count = call_count.clone();
2939 let status_count = Arc::new(AtomicUsize::new(0));
2940 let status_count_inner = status_count.clone();
2941
2942 let app = Router::new()
2943 .route(
2944 "/api/v1/sessions/{id}/output",
2945 get(
2946 move |_path: Path<String>,
2947 _query: Query<std::collections::HashMap<String, String>>| {
2948 let count = output_count.clone();
2949 async move {
2950 let n = count.fetch_add(1, Ordering::SeqCst);
2951 let output = match n {
2952 0 => "line1\nline2".to_owned(),
2953 1 => "line1\nline2\nline3".to_owned(),
2954 _ => "line2\nline3\nline4".to_owned(),
2955 };
2956 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2957 }
2958 },
2959 ),
2960 )
2961 .route(
2962 "/api/v1/sessions/{id}",
2963 get(move |_path: Path<String>| {
2964 let count = status_count_inner.clone();
2965 async move {
2966 let n = count.fetch_add(1, Ordering::SeqCst);
2967 let status = if n < 2 { "running" } else { "completed" };
2968 format!(
2969 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2970 )
2971 }
2972 }),
2973 );
2974
2975 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2976 let addr = listener.local_addr().unwrap();
2977 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2978 format!("http://127.0.0.1:{}", addr.port())
2979 }
2980
2981 #[tokio::test]
2982 async fn test_follow_logs_polls_and_exits_on_completed() {
2983 let base = start_follow_test_server().await;
2984 let client = reqwest::Client::new();
2985 let mut buf = Vec::new();
2986
2987 follow_logs(&client, &base, "test", 100, None, &mut buf)
2988 .await
2989 .unwrap();
2990
2991 let output = String::from_utf8(buf).unwrap();
2992 assert!(output.contains("line1"));
2994 assert!(output.contains("line2"));
2995 assert!(output.contains("line3"));
2996 assert!(output.contains("line4"));
2997 }
2998
2999 #[tokio::test]
3000 async fn test_execute_logs_follow_success() {
3001 let base = start_follow_test_server().await;
3002 let node = base.strip_prefix("http://").unwrap().to_owned();
3004
3005 let cli = Cli {
3006 node,
3007 token: None,
3008 command: Commands::Logs {
3009 name: "test".into(),
3010 lines: 100,
3011 follow: true,
3012 },
3013 };
3014 let result = execute(&cli).await.unwrap();
3016 assert_eq!(result, "");
3017 }
3018
3019 #[tokio::test]
3020 async fn test_execute_logs_follow_connection_refused() {
3021 let cli = Cli {
3022 node: "localhost:1".into(),
3023 token: None,
3024 command: Commands::Logs {
3025 name: "test".into(),
3026 lines: 50,
3027 follow: true,
3028 },
3029 };
3030 let result = execute(&cli).await;
3031 let err = result.unwrap_err().to_string();
3032 assert!(
3033 err.contains("Could not connect to pulpod"),
3034 "Expected friendly error, got: {err}"
3035 );
3036 }
3037
3038 #[tokio::test]
3039 async fn test_follow_logs_exits_on_dead() {
3040 use axum::{Router, extract::Path, extract::Query, routing::get};
3041
3042 let app = Router::new()
3043 .route(
3044 "/api/v1/sessions/{id}/output",
3045 get(
3046 |_path: Path<String>,
3047 _query: Query<std::collections::HashMap<String, String>>| async {
3048 r#"{"output":"some output"}"#.to_owned()
3049 },
3050 ),
3051 )
3052 .route(
3053 "/api/v1/sessions/{id}",
3054 get(|_path: Path<String>| async {
3055 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3056 }),
3057 );
3058
3059 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3060 let addr = listener.local_addr().unwrap();
3061 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3062 let base = format!("http://127.0.0.1:{}", addr.port());
3063
3064 let client = reqwest::Client::new();
3065 let mut buf = Vec::new();
3066 follow_logs(&client, &base, "test", 100, None, &mut buf)
3067 .await
3068 .unwrap();
3069
3070 let output = String::from_utf8(buf).unwrap();
3071 assert!(output.contains("some output"));
3072 }
3073
3074 #[tokio::test]
3075 async fn test_follow_logs_exits_on_stale() {
3076 use axum::{Router, extract::Path, extract::Query, routing::get};
3077
3078 let app = Router::new()
3079 .route(
3080 "/api/v1/sessions/{id}/output",
3081 get(
3082 |_path: Path<String>,
3083 _query: Query<std::collections::HashMap<String, String>>| async {
3084 r#"{"output":"stale output"}"#.to_owned()
3085 },
3086 ),
3087 )
3088 .route(
3089 "/api/v1/sessions/{id}",
3090 get(|_path: Path<String>| async {
3091 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
3092 }),
3093 );
3094
3095 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3096 let addr = listener.local_addr().unwrap();
3097 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3098 let base = format!("http://127.0.0.1:{}", addr.port());
3099
3100 let client = reqwest::Client::new();
3101 let mut buf = Vec::new();
3102 follow_logs(&client, &base, "test", 100, None, &mut buf)
3103 .await
3104 .unwrap();
3105
3106 let output = String::from_utf8(buf).unwrap();
3107 assert!(output.contains("stale output"));
3108 }
3109
3110 #[tokio::test]
3111 async fn test_execute_logs_follow_non_reqwest_error() {
3112 use axum::{Router, extract::Path, extract::Query, routing::get};
3113
3114 let app = Router::new()
3116 .route(
3117 "/api/v1/sessions/{id}/output",
3118 get(
3119 |_path: Path<String>,
3120 _query: Query<std::collections::HashMap<String, String>>| async {
3121 r#"{"output":"initial"}"#.to_owned()
3122 },
3123 ),
3124 )
3125 .route(
3126 "/api/v1/sessions/{id}",
3127 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3128 );
3129
3130 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3131 let addr = listener.local_addr().unwrap();
3132 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3133 let node = format!("127.0.0.1:{}", addr.port());
3134
3135 let cli = Cli {
3136 node,
3137 token: None,
3138 command: Commands::Logs {
3139 name: "test".into(),
3140 lines: 100,
3141 follow: true,
3142 },
3143 };
3144 let err = execute(&cli).await.unwrap_err();
3145 let msg = err.to_string();
3147 assert!(
3148 msg.contains("expected ident"),
3149 "Expected serde parse error, got: {msg}"
3150 );
3151 }
3152
3153 #[test]
3154 fn test_cli_parse_spawn_with_guardrails() {
3155 let cli = Cli::try_parse_from([
3156 "pulpo",
3157 "spawn",
3158 "--workdir",
3159 "/tmp",
3160 "--max-turns",
3161 "10",
3162 "--max-budget",
3163 "5.5",
3164 "--output-format",
3165 "json",
3166 "Do it",
3167 ])
3168 .unwrap();
3169 assert!(matches!(
3170 &cli.command,
3171 Commands::Spawn { max_turns, max_budget, output_format, .. }
3172 if *max_turns == Some(10) && *max_budget == Some(5.5)
3173 && output_format.as_deref() == Some("json")
3174 ));
3175 }
3176
3177 #[tokio::test]
3178 async fn test_fetch_session_status_connection_error() {
3179 let client = reqwest::Client::new();
3180 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3181 assert!(result.is_err());
3182 }
3183
3184 #[test]
3187 fn test_build_crontab_line() {
3188 let line = build_crontab_line(
3189 "nightly-review",
3190 "0 3 * * *",
3191 "/home/me/repo",
3192 "claude",
3193 "Review PRs",
3194 "localhost:7433",
3195 );
3196 assert_eq!(
3197 line,
3198 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3199 );
3200 }
3201
3202 #[test]
3203 fn test_crontab_install_success() {
3204 let crontab = "# existing cron\n0 * * * * echo hi\n";
3205 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3206 let result = crontab_install(crontab, "my-job", line).unwrap();
3207 assert!(result.starts_with("# existing cron\n"));
3208 assert!(result.ends_with("#pulpo:my-job\n"));
3209 assert!(result.contains("echo hi"));
3210 }
3211
3212 #[test]
3213 fn test_crontab_install_duplicate_error() {
3214 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3215 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3216 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3217 assert!(err.to_string().contains("already exists"));
3218 }
3219
3220 #[test]
3221 fn test_crontab_list_empty() {
3222 assert_eq!(crontab_list(""), "No pulpo schedules.");
3223 }
3224
3225 #[test]
3226 fn test_crontab_list_no_pulpo_entries() {
3227 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3228 }
3229
3230 #[test]
3231 fn test_crontab_list_with_entries() {
3232 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3233 let output = crontab_list(crontab);
3234 assert!(output.contains("NAME"));
3235 assert!(output.contains("CRON"));
3236 assert!(output.contains("PAUSED"));
3237 assert!(output.contains("nightly"));
3238 assert!(output.contains("0 3 * * *"));
3239 assert!(output.contains("no"));
3240 }
3241
3242 #[test]
3243 fn test_crontab_list_paused_entry() {
3244 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3245 let output = crontab_list(crontab);
3246 assert!(output.contains("paused-job"));
3247 assert!(output.contains("yes"));
3248 }
3249
3250 #[test]
3251 fn test_crontab_list_short_line() {
3252 let crontab = "badcron #pulpo:broken\n";
3254 let output = crontab_list(crontab);
3255 assert!(output.contains("broken"));
3256 assert!(output.contains('?'));
3257 }
3258
3259 #[test]
3260 fn test_crontab_remove_success() {
3261 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3262 let result = crontab_remove(crontab, "my-job").unwrap();
3263 assert!(result.contains("echo hi"));
3264 assert!(!result.contains("my-job"));
3265 }
3266
3267 #[test]
3268 fn test_crontab_remove_not_found() {
3269 let crontab = "0 * * * * echo hi\n";
3270 let err = crontab_remove(crontab, "ghost").unwrap_err();
3271 assert!(err.to_string().contains("not found"));
3272 }
3273
3274 #[test]
3275 fn test_crontab_pause_success() {
3276 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3277 let result = crontab_pause(crontab, "my-job").unwrap();
3278 assert!(result.starts_with('#'));
3279 assert!(result.contains("#pulpo:my-job"));
3280 }
3281
3282 #[test]
3283 fn test_crontab_pause_not_found() {
3284 let crontab = "0 * * * * echo hi\n";
3285 let err = crontab_pause(crontab, "ghost").unwrap_err();
3286 assert!(err.to_string().contains("not found or already paused"));
3287 }
3288
3289 #[test]
3290 fn test_crontab_pause_already_paused() {
3291 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3292 let err = crontab_pause(crontab, "my-job").unwrap_err();
3293 assert!(err.to_string().contains("already paused"));
3294 }
3295
3296 #[test]
3297 fn test_crontab_resume_success() {
3298 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3299 let result = crontab_resume(crontab, "my-job").unwrap();
3300 assert!(!result.starts_with('#'));
3301 assert!(result.contains("#pulpo:my-job"));
3302 }
3303
3304 #[test]
3305 fn test_crontab_resume_not_found() {
3306 let crontab = "0 * * * * echo hi\n";
3307 let err = crontab_resume(crontab, "ghost").unwrap_err();
3308 assert!(err.to_string().contains("not found or not paused"));
3309 }
3310
3311 #[test]
3312 fn test_crontab_resume_not_paused() {
3313 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3314 let err = crontab_resume(crontab, "my-job").unwrap_err();
3315 assert!(err.to_string().contains("not paused"));
3316 }
3317
3318 #[test]
3321 fn test_cli_parse_schedule_install() {
3322 let cli = Cli::try_parse_from([
3323 "pulpo",
3324 "schedule",
3325 "install",
3326 "nightly",
3327 "0 3 * * *",
3328 "--workdir",
3329 "/tmp/repo",
3330 "Review",
3331 "PRs",
3332 ])
3333 .unwrap();
3334 assert!(matches!(
3335 &cli.command,
3336 Commands::Schedule {
3337 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3338 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3339 && provider == "claude" && prompt == &["Review", "PRs"]
3340 ));
3341 }
3342
3343 #[test]
3344 fn test_cli_parse_schedule_list() {
3345 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3346 assert!(matches!(
3347 &cli.command,
3348 Commands::Schedule {
3349 action: ScheduleAction::List
3350 }
3351 ));
3352 }
3353
3354 #[test]
3355 fn test_cli_parse_schedule_remove() {
3356 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3357 assert!(matches!(
3358 &cli.command,
3359 Commands::Schedule {
3360 action: ScheduleAction::Remove { name }
3361 } if name == "nightly"
3362 ));
3363 }
3364
3365 #[test]
3366 fn test_cli_parse_schedule_pause() {
3367 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3368 assert!(matches!(
3369 &cli.command,
3370 Commands::Schedule {
3371 action: ScheduleAction::Pause { name }
3372 } if name == "nightly"
3373 ));
3374 }
3375
3376 #[test]
3377 fn test_cli_parse_schedule_resume() {
3378 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3379 assert!(matches!(
3380 &cli.command,
3381 Commands::Schedule {
3382 action: ScheduleAction::Resume { name }
3383 } if name == "nightly"
3384 ));
3385 }
3386
3387 #[test]
3388 fn test_cli_parse_schedule_alias() {
3389 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3390 assert!(matches!(
3391 &cli.command,
3392 Commands::Schedule {
3393 action: ScheduleAction::List
3394 }
3395 ));
3396 }
3397
3398 #[test]
3399 fn test_cli_parse_schedule_list_alias() {
3400 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3401 assert!(matches!(
3402 &cli.command,
3403 Commands::Schedule {
3404 action: ScheduleAction::List
3405 }
3406 ));
3407 }
3408
3409 #[test]
3410 fn test_cli_parse_schedule_remove_alias() {
3411 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3412 assert!(matches!(
3413 &cli.command,
3414 Commands::Schedule {
3415 action: ScheduleAction::Remove { name }
3416 } if name == "nightly"
3417 ));
3418 }
3419
3420 #[test]
3421 fn test_cli_parse_schedule_install_custom_provider() {
3422 let cli = Cli::try_parse_from([
3423 "pulpo",
3424 "schedule",
3425 "install",
3426 "daily",
3427 "0 9 * * *",
3428 "--workdir",
3429 "/tmp",
3430 "--provider",
3431 "codex",
3432 "Run tests",
3433 ])
3434 .unwrap();
3435 assert!(matches!(
3436 &cli.command,
3437 Commands::Schedule {
3438 action: ScheduleAction::Install { provider, .. }
3439 } if provider == "codex"
3440 ));
3441 }
3442
3443 #[tokio::test]
3444 async fn test_execute_schedule_via_execute() {
3445 let node = start_test_server().await;
3447 let cli = Cli {
3448 node,
3449 token: None,
3450 command: Commands::Schedule {
3451 action: ScheduleAction::List,
3452 },
3453 };
3454 let result = execute(&cli).await;
3455 assert!(result.is_ok() || result.is_err());
3457 }
3458
3459 #[test]
3460 fn test_schedule_action_debug() {
3461 let action = ScheduleAction::List;
3462 assert_eq!(format!("{action:?}"), "List");
3463 }
3464
3465 #[test]
3468 fn test_cli_parse_knowledge() {
3469 let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3470 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3471 }
3472
3473 #[test]
3474 fn test_cli_parse_knowledge_alias() {
3475 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3476 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3477 }
3478
3479 #[test]
3480 fn test_cli_parse_knowledge_with_filters() {
3481 let cli = Cli::try_parse_from([
3482 "pulpo",
3483 "knowledge",
3484 "--kind",
3485 "failure",
3486 "--repo",
3487 "/tmp/repo",
3488 "--ink",
3489 "coder",
3490 "--limit",
3491 "5",
3492 ])
3493 .unwrap();
3494 match &cli.command {
3495 Commands::Knowledge {
3496 kind,
3497 repo,
3498 ink,
3499 limit,
3500 ..
3501 } => {
3502 assert_eq!(kind.as_deref(), Some("failure"));
3503 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3504 assert_eq!(ink.as_deref(), Some("coder"));
3505 assert_eq!(*limit, 5);
3506 }
3507 _ => panic!("expected Knowledge command"),
3508 }
3509 }
3510
3511 #[test]
3512 fn test_cli_parse_knowledge_context() {
3513 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3514 .unwrap();
3515 match &cli.command {
3516 Commands::Knowledge { context, repo, .. } => {
3517 assert!(*context);
3518 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3519 }
3520 _ => panic!("expected Knowledge command"),
3521 }
3522 }
3523
3524 #[test]
3525 fn test_format_knowledge_empty() {
3526 assert_eq!(format_knowledge(&[]), "No knowledge found.");
3527 }
3528
3529 #[test]
3530 fn test_format_knowledge_items() {
3531 use chrono::Utc;
3532 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3533 use uuid::Uuid;
3534
3535 let items = vec![
3536 Knowledge {
3537 id: Uuid::new_v4(),
3538 session_id: Uuid::new_v4(),
3539 kind: KnowledgeKind::Summary,
3540 scope_repo: Some("/tmp/repo".into()),
3541 scope_ink: Some("coder".into()),
3542 title: "Fixed the auth bug".into(),
3543 body: "Details".into(),
3544 tags: vec!["claude".into(), "completed".into()],
3545 relevance: 0.7,
3546 created_at: Utc::now(),
3547 },
3548 Knowledge {
3549 id: Uuid::new_v4(),
3550 session_id: Uuid::new_v4(),
3551 kind: KnowledgeKind::Failure,
3552 scope_repo: None,
3553 scope_ink: None,
3554 title: "OOM crash during build".into(),
3555 body: "Details".into(),
3556 tags: vec!["failure".into()],
3557 relevance: 0.9,
3558 created_at: Utc::now(),
3559 },
3560 ];
3561
3562 let output = format_knowledge(&items);
3563 assert!(output.contains("KIND"));
3564 assert!(output.contains("TITLE"));
3565 assert!(output.contains("summary"));
3566 assert!(output.contains("failure"));
3567 assert!(output.contains("Fixed the auth bug"));
3568 assert!(output.contains("repo"));
3569 assert!(output.contains("0.70"));
3570 }
3571
3572 #[test]
3573 fn test_format_knowledge_long_title_truncated() {
3574 use chrono::Utc;
3575 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3576 use uuid::Uuid;
3577
3578 let items = vec![Knowledge {
3579 id: Uuid::new_v4(),
3580 session_id: Uuid::new_v4(),
3581 kind: KnowledgeKind::Summary,
3582 scope_repo: Some("/repo".into()),
3583 scope_ink: None,
3584 title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3585 body: "Body".into(),
3586 tags: vec![],
3587 relevance: 0.5,
3588 created_at: Utc::now(),
3589 }];
3590
3591 let output = format_knowledge(&items);
3592 assert!(output.contains('…'));
3593 }
3594
3595 #[test]
3596 fn test_cli_parse_knowledge_get() {
3597 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3598 match &cli.command {
3599 Commands::Knowledge { get, .. } => {
3600 assert_eq!(get.as_deref(), Some("abc-123"));
3601 }
3602 _ => panic!("expected Knowledge command"),
3603 }
3604 }
3605
3606 #[test]
3607 fn test_cli_parse_knowledge_delete() {
3608 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3609 match &cli.command {
3610 Commands::Knowledge { delete, .. } => {
3611 assert_eq!(delete.as_deref(), Some("abc-123"));
3612 }
3613 _ => panic!("expected Knowledge command"),
3614 }
3615 }
3616
3617 #[test]
3618 fn test_cli_parse_knowledge_push() {
3619 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3620 match &cli.command {
3621 Commands::Knowledge { push, .. } => {
3622 assert!(*push);
3623 }
3624 _ => panic!("expected Knowledge command"),
3625 }
3626 }
3627
3628 #[test]
3629 fn test_format_knowledge_no_repo() {
3630 use chrono::Utc;
3631 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3632 use uuid::Uuid;
3633
3634 let items = vec![Knowledge {
3635 id: Uuid::new_v4(),
3636 session_id: Uuid::new_v4(),
3637 kind: KnowledgeKind::Summary,
3638 scope_repo: None,
3639 scope_ink: None,
3640 title: "Global finding".into(),
3641 body: "Body".into(),
3642 tags: vec![],
3643 relevance: 0.5,
3644 created_at: Utc::now(),
3645 }];
3646
3647 let output = format_knowledge(&items);
3648 assert!(output.contains('-')); }
3650}