1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, InterventionEventResponse, KnowledgeDeleteResponse, KnowledgeItemResponse,
5 KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32 #[command(visible_alias = "a")]
34 Attach {
35 name: String,
37 },
38
39 #[command(visible_alias = "i")]
41 Input {
42 name: String,
44 text: Option<String>,
46 },
47
48 #[command(visible_alias = "s")]
50 Spawn {
51 #[arg(long)]
53 workdir: Option<String>,
54
55 #[arg(long)]
57 name: Option<String>,
58
59 #[arg(long)]
61 provider: Option<String>,
62
63 #[arg(long)]
65 auto: bool,
66
67 #[arg(long)]
69 unrestricted: bool,
70
71 #[arg(long)]
73 model: Option<String>,
74
75 #[arg(long)]
77 system_prompt: Option<String>,
78
79 #[arg(long, value_delimiter = ',')]
81 allowed_tools: Option<Vec<String>>,
82
83 #[arg(long)]
85 ink: Option<String>,
86
87 #[arg(long)]
89 max_turns: Option<u32>,
90
91 #[arg(long)]
93 max_budget: Option<f64>,
94
95 #[arg(long)]
97 output_format: Option<String>,
98
99 prompt: Vec<String>,
101 },
102
103 #[command(visible_alias = "ls")]
105 List,
106
107 #[command(visible_alias = "l")]
109 Logs {
110 name: String,
112
113 #[arg(long, default_value = "100")]
115 lines: usize,
116
117 #[arg(short, long)]
119 follow: bool,
120 },
121
122 #[command(visible_alias = "k")]
124 Kill {
125 name: String,
127 },
128
129 #[command(visible_alias = "rm")]
131 Delete {
132 name: String,
134 },
135
136 #[command(visible_alias = "r")]
138 Resume {
139 name: String,
141 },
142
143 #[command(visible_alias = "n")]
145 Nodes,
146
147 #[command(visible_alias = "iv")]
149 Interventions {
150 name: String,
152 },
153
154 Ui,
156
157 #[command(visible_alias = "kn")]
159 Knowledge {
160 #[arg(long)]
162 session: Option<String>,
163
164 #[arg(long)]
166 kind: Option<String>,
167
168 #[arg(long)]
170 repo: Option<String>,
171
172 #[arg(long)]
174 ink: Option<String>,
175
176 #[arg(long, default_value = "20")]
178 limit: usize,
179
180 #[arg(long)]
182 context: bool,
183
184 #[arg(long)]
186 get: Option<String>,
187
188 #[arg(long)]
190 delete: Option<String>,
191
192 #[arg(long)]
194 push: bool,
195 },
196
197 #[command(visible_alias = "sched")]
199 Schedule {
200 #[command(subcommand)]
201 action: ScheduleAction,
202 },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207 Install {
209 name: String,
211 cron: String,
213 #[arg(long)]
215 workdir: String,
216 #[arg(long, default_value = "claude")]
218 provider: String,
219 prompt: Vec<String>,
221 },
222 #[command(alias = "ls")]
224 List,
225 #[command(alias = "rm")]
227 Remove {
228 name: String,
230 },
231 Pause {
233 name: String,
235 },
236 Resume {
238 name: String,
240 },
241}
242
243pub fn base_url(node: &str) -> String {
245 format!("http://{node}")
246}
247
248#[derive(serde::Deserialize)]
250struct OutputResponse {
251 output: String,
252}
253
254fn format_sessions(sessions: &[Session]) -> String {
256 if sessions.is_empty() {
257 return "No sessions.".into();
258 }
259 let mut lines = vec![format!(
260 "{:<20} {:<12} {:<10} {:<14} {}",
261 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262 )];
263 for s in sessions {
264 let prompt_display = if s.prompt.len() > 40 {
265 format!("{}...", &s.prompt[..37])
266 } else {
267 s.prompt.clone()
268 };
269 let status_display = if s.waiting_for_input {
270 "waiting".to_owned()
271 } else {
272 s.status.to_string()
273 };
274 lines.push(format!(
275 "{:<20} {:<12} {:<10} {:<14} {}",
276 s.name, status_display, s.provider, s.mode, prompt_display
277 ));
278 }
279 lines.join("\n")
280}
281
282fn format_nodes(resp: &PeersResponse) -> String {
284 let mut lines = vec![format!(
285 "{:<20} {:<25} {:<10} {}",
286 "NAME", "ADDRESS", "STATUS", "SESSIONS"
287 )];
288 lines.push(format!(
289 "{:<20} {:<25} {:<10} {}",
290 resp.local.name, "(local)", "online", "-"
291 ));
292 for p in &resp.peers {
293 let sessions = p
294 .session_count
295 .map_or_else(|| "-".into(), |c| c.to_string());
296 lines.push(format!(
297 "{:<20} {:<25} {:<10} {}",
298 p.name, p.address, p.status, sessions
299 ));
300 }
301 lines.join("\n")
302}
303
304fn format_interventions(events: &[InterventionEventResponse]) -> String {
306 if events.is_empty() {
307 return "No intervention events.".into();
308 }
309 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310 for e in events {
311 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312 }
313 lines.join("\n")
314}
315
316fn format_knowledge(items: &[Knowledge]) -> String {
318 if items.is_empty() {
319 return "No knowledge found.".into();
320 }
321 let mut lines = vec![format!(
322 "{:<10} {:<40} {:<10} {:<6} {}",
323 "KIND", "TITLE", "REPO", "REL", "TAGS"
324 )];
325 for k in items {
326 let title = if k.title.len() > 38 {
327 format!("{}…", &k.title[..37])
328 } else {
329 k.title.clone()
330 };
331 let repo = k
332 .scope_repo
333 .as_deref()
334 .and_then(|r| r.rsplit('/').next())
335 .unwrap_or("-");
336 let tags = k.tags.join(",");
337 lines.push(format!(
338 "{:<10} {:<40} {:<10} {:<6.2} {}",
339 k.kind, title, repo, k.relevance, tags
340 ));
341 }
342 lines.join("\n")
343}
344
345#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348 #[cfg(target_os = "macos")]
349 {
350 let mut cmd = std::process::Command::new("open");
351 cmd.arg(url);
352 cmd
353 }
354 #[cfg(target_os = "linux")]
355 {
356 let mut cmd = std::process::Command::new("xdg-open");
357 cmd.arg(url);
358 cmd
359 }
360 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361 {
362 let mut cmd = std::process::Command::new("xdg-open");
364 cmd.arg(url);
365 cmd
366 }
367}
368
369#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372 build_open_command(url).status()?;
373 Ok(())
374}
375
376#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379 Ok(())
380}
381
382#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386 let mut cmd = std::process::Command::new("tmux");
387 cmd.args(["attach-session", "-t", backend_session_id]);
388 cmd
389}
390
391#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394 let status = build_attach_command(backend_session_id).status()?;
395 if !status.success() {
396 anyhow::bail!("attach failed with {status}");
397 }
398 Ok(())
399}
400
401#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405 Ok(())
406}
407
408fn api_error(text: &str) -> anyhow::Error {
410 serde_json::from_str::<serde_json::Value>(text)
411 .ok()
412 .and_then(|v| v["error"].as_str().map(String::from))
413 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418 if resp.status().is_success() {
419 Ok(resp.text().await?)
420 } else {
421 let text = resp.text().await?;
422 Err(api_error(&text))
423 }
424}
425
426fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428 if err.is_connect() {
429 anyhow::anyhow!(
430 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431 )
432 } else {
433 anyhow::anyhow!("Network error connecting to {node}: {err}")
434 }
435}
436
437fn is_localhost(node: &str) -> bool {
439 let host = node.split(':').next().unwrap_or(node);
440 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445 let resp = client
446 .get(format!("{base}/api/v1/auth/token"))
447 .send()
448 .await
449 .ok()?;
450 let body: AuthTokenResponse = resp.json().await.ok()?;
451 if body.token.is_empty() {
452 None
453 } else {
454 Some(body.token)
455 }
456}
457
458async fn resolve_token(
460 client: &reqwest::Client,
461 base: &str,
462 node: &str,
463 explicit: Option<&str>,
464) -> Option<String> {
465 if let Some(t) = explicit {
466 return Some(t.to_owned());
467 }
468 if is_localhost(node) {
469 return discover_token(client, base).await;
470 }
471 None
472}
473
474fn authed_get(
476 client: &reqwest::Client,
477 url: String,
478 token: Option<&str>,
479) -> reqwest::RequestBuilder {
480 let req = client.get(url);
481 if let Some(t) = token {
482 req.bearer_auth(t)
483 } else {
484 req
485 }
486}
487
488fn authed_post(
490 client: &reqwest::Client,
491 url: String,
492 token: Option<&str>,
493) -> reqwest::RequestBuilder {
494 let req = client.post(url);
495 if let Some(t) = token {
496 req.bearer_auth(t)
497 } else {
498 req
499 }
500}
501
502fn authed_delete(
504 client: &reqwest::Client,
505 url: String,
506 token: Option<&str>,
507) -> reqwest::RequestBuilder {
508 let req = client.delete(url);
509 if let Some(t) = token {
510 req.bearer_auth(t)
511 } else {
512 req
513 }
514}
515
516async fn fetch_output(
518 client: &reqwest::Client,
519 base: &str,
520 name: &str,
521 lines: usize,
522 token: Option<&str>,
523) -> Result<String> {
524 let resp = authed_get(
525 client,
526 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527 token,
528 )
529 .send()
530 .await?;
531 let text = ok_or_api_error(resp).await?;
532 let output: OutputResponse = serde_json::from_str(&text)?;
533 Ok(output.output)
534}
535
536async fn fetch_session_status(
538 client: &reqwest::Client,
539 base: &str,
540 name: &str,
541 token: Option<&str>,
542) -> Result<String> {
543 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544 .send()
545 .await?;
546 let text = ok_or_api_error(resp).await?;
547 let session: Session = serde_json::from_str(&text)?;
548 Ok(session.status.to_string())
549}
550
551fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558 if prev.is_empty() {
559 return new;
560 }
561
562 let prev_lines: Vec<&str> = prev.lines().collect();
563 let new_lines: Vec<&str> = new.lines().collect();
564
565 if new_lines.is_empty() {
566 return "";
567 }
568
569 let last_prev = prev_lines[prev_lines.len() - 1];
571
572 for i in (0..new_lines.len()).rev() {
574 if new_lines[i] == last_prev {
575 let overlap_len = prev_lines.len().min(i + 1);
577 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579 if prev_tail == new_overlap {
580 if i + 1 < new_lines.len() {
581 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583 return new.get(consumed.min(new.len())..).unwrap_or("");
584 }
585 return "";
586 }
587 }
588 }
589
590 new
592}
593
594async fn follow_logs(
596 client: &reqwest::Client,
597 base: &str,
598 name: &str,
599 lines: usize,
600 token: Option<&str>,
601 writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604 write!(writer, "{prev_output}")?;
605
606 loop {
607 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609 let status = fetch_session_status(client, base, name, token).await?;
611 let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613 let new_output = fetch_output(client, base, name, lines, token).await?;
615
616 let diff = diff_output(&prev_output, &new_output);
617 if !diff.is_empty() {
618 write!(writer, "{diff}")?;
619 }
620 prev_output = new_output;
621
622 if is_terminal {
623 break;
624 }
625 }
626 Ok(())
627}
628
629#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637 let output = std::process::Command::new("crontab").arg("-l").output()?;
638 if output.status.success() {
639 Ok(String::from_utf8_lossy(&output.stdout).to_string())
640 } else {
641 Ok(String::new())
642 }
643}
644
645#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648 use std::io::Write;
649 let mut child = std::process::Command::new("crontab")
650 .arg("-")
651 .stdin(std::process::Stdio::piped())
652 .spawn()?;
653 child
654 .stdin
655 .as_mut()
656 .unwrap()
657 .write_all(content.as_bytes())?;
658 let status = child.wait()?;
659 if !status.success() {
660 anyhow::bail!("crontab write failed");
661 }
662 Ok(())
663}
664
665#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668 name: &str,
669 cron: &str,
670 workdir: &str,
671 provider: &str,
672 prompt: &str,
673 node: &str,
674) -> String {
675 format!(
676 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677 )
678}
679
680#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683 let tag = format!("{CRONTAB_TAG}{name}");
684 if crontab.contains(&tag) {
685 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686 }
687 let mut result = crontab.to_owned();
688 result.push_str(line);
689 Ok(result)
690}
691
692#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695 let entries: Vec<&str> = crontab
696 .lines()
697 .filter(|l| l.contains(CRONTAB_TAG))
698 .collect();
699 if entries.is_empty() {
700 return "No pulpo schedules.".into();
701 }
702 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703 for entry in entries {
704 let paused = entry.starts_with('#');
705 let raw = entry.trim_start_matches('#').trim();
706 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708 let cron_expr = if parts.len() >= 5 {
709 parts[..5].join(" ")
710 } else {
711 "?".into()
712 };
713 lines.push(format!(
714 "{:<20} {:<15} {}",
715 name,
716 cron_expr,
717 if paused { "yes" } else { "no" }
718 ));
719 }
720 lines.join("\n")
721}
722
723#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726 use std::fmt::Write;
727 let tag = format!("{CRONTAB_TAG}{name}");
728 let filtered =
729 crontab
730 .lines()
731 .filter(|l| !l.contains(&tag))
732 .fold(String::new(), |mut acc, l| {
733 writeln!(acc, "{l}").unwrap();
734 acc
735 });
736 if filtered.len() == crontab.len() {
737 anyhow::bail!("schedule \"{name}\" not found");
738 }
739 Ok(filtered)
740}
741
742#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745 use std::fmt::Write;
746 let tag = format!("{CRONTAB_TAG}{name}");
747 let mut found = false;
748 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749 if l.contains(&tag) && !l.starts_with('#') {
750 found = true;
751 writeln!(acc, "#{l}").unwrap();
752 } else {
753 writeln!(acc, "{l}").unwrap();
754 }
755 acc
756 });
757 if !found {
758 anyhow::bail!("schedule \"{name}\" not found or already paused");
759 }
760 Ok(updated)
761}
762
763#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766 use std::fmt::Write;
767 let tag = format!("{CRONTAB_TAG}{name}");
768 let mut found = false;
769 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770 if l.contains(&tag) && l.starts_with('#') {
771 found = true;
772 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773 } else {
774 writeln!(acc, "{l}").unwrap();
775 }
776 acc
777 });
778 if !found {
779 anyhow::bail!("schedule \"{name}\" not found or not paused");
780 }
781 Ok(updated)
782}
783
784#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787 match action {
788 ScheduleAction::Install {
789 name,
790 cron,
791 workdir,
792 provider,
793 prompt,
794 } => {
795 let crontab = read_crontab()?;
796 let joined_prompt = prompt.join(" ");
797 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798 let updated = crontab_install(&crontab, name, &line)?;
799 write_crontab(&updated)?;
800 Ok(format!("Installed schedule \"{name}\""))
801 }
802 ScheduleAction::List => {
803 let crontab = read_crontab()?;
804 Ok(crontab_list(&crontab))
805 }
806 ScheduleAction::Remove { name } => {
807 let crontab = read_crontab()?;
808 let updated = crontab_remove(&crontab, name)?;
809 write_crontab(&updated)?;
810 Ok(format!("Removed schedule \"{name}\""))
811 }
812 ScheduleAction::Pause { name } => {
813 let crontab = read_crontab()?;
814 let updated = crontab_pause(&crontab, name)?;
815 write_crontab(&updated)?;
816 Ok(format!("Paused schedule \"{name}\""))
817 }
818 ScheduleAction::Resume { name } => {
819 let crontab = read_crontab()?;
820 let updated = crontab_resume(&crontab, name)?;
821 write_crontab(&updated)?;
822 Ok(format!("Resumed schedule \"{name}\""))
823 }
824 }
825}
826
827#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830 Ok(String::new())
831}
832
833#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836 let url = base_url(&cli.node);
837 let client = reqwest::Client::new();
838 let node = &cli.node;
839 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841 match &cli.command {
842 Commands::Attach { name } => {
843 let resp = authed_get(
845 &client,
846 format!("{url}/api/v1/sessions/{name}"),
847 token.as_deref(),
848 )
849 .send()
850 .await
851 .map_err(|e| friendly_error(&e, node))?;
852 let text = ok_or_api_error(resp).await?;
853 let session: Session = serde_json::from_str(&text)?;
854 let backend_id = session
855 .backend_session_id
856 .unwrap_or_else(|| format!("pulpo-{name}"));
857 attach_session(&backend_id)?;
858 Ok(format!("Detached from session {name}."))
859 }
860 Commands::Input { name, text } => {
861 let input_text = text.as_deref().unwrap_or("\n");
862 let body = serde_json::json!({ "text": input_text });
863 let resp = authed_post(
864 &client,
865 format!("{url}/api/v1/sessions/{name}/input"),
866 token.as_deref(),
867 )
868 .json(&body)
869 .send()
870 .await
871 .map_err(|e| friendly_error(&e, node))?;
872 ok_or_api_error(resp).await?;
873 Ok(format!("Sent input to session {name}."))
874 }
875 Commands::List => {
876 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
877 .send()
878 .await
879 .map_err(|e| friendly_error(&e, node))?;
880 let text = ok_or_api_error(resp).await?;
881 let sessions: Vec<Session> = serde_json::from_str(&text)?;
882 Ok(format_sessions(&sessions))
883 }
884 Commands::Nodes => {
885 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
886 .send()
887 .await
888 .map_err(|e| friendly_error(&e, node))?;
889 let text = ok_or_api_error(resp).await?;
890 let resp: PeersResponse = serde_json::from_str(&text)?;
891 Ok(format_nodes(&resp))
892 }
893 Commands::Spawn {
894 workdir,
895 name,
896 provider,
897 auto,
898 unrestricted,
899 model,
900 system_prompt,
901 allowed_tools,
902 ink,
903 max_turns,
904 max_budget,
905 output_format,
906 prompt,
907 } => {
908 let prompt_text = prompt.join(" ");
909 let mode = if *auto { "autonomous" } else { "interactive" };
910 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
912 std::env::current_dir()
913 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
914 });
915 let mut body = serde_json::json!({
916 "workdir": resolved_workdir,
917 "mode": mode,
918 });
919 if !prompt_text.is_empty() {
921 body["prompt"] = serde_json::json!(prompt_text);
922 }
923 if let Some(p) = provider {
925 body["provider"] = serde_json::json!(p);
926 }
927 if *unrestricted {
928 body["unrestricted"] = serde_json::json!(true);
929 }
930 if let Some(n) = name {
931 body["name"] = serde_json::json!(n);
932 }
933 if let Some(m) = model {
934 body["model"] = serde_json::json!(m);
935 }
936 if let Some(sp) = system_prompt {
937 body["system_prompt"] = serde_json::json!(sp);
938 }
939 if let Some(tools) = allowed_tools {
940 body["allowed_tools"] = serde_json::json!(tools);
941 }
942 if let Some(p) = ink {
943 body["ink"] = serde_json::json!(p);
944 }
945 if let Some(mt) = max_turns {
946 body["max_turns"] = serde_json::json!(mt);
947 }
948 if let Some(mb) = max_budget {
949 body["max_budget_usd"] = serde_json::json!(mb);
950 }
951 if let Some(of) = output_format {
952 body["output_format"] = serde_json::json!(of);
953 }
954 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
955 .json(&body)
956 .send()
957 .await
958 .map_err(|e| friendly_error(&e, node))?;
959 let text = ok_or_api_error(resp).await?;
960 let session: Session = serde_json::from_str(&text)?;
961 Ok(format!(
962 "Created session \"{}\" ({})",
963 session.name, session.id
964 ))
965 }
966 Commands::Kill { name } => {
967 let resp = authed_post(
968 &client,
969 format!("{url}/api/v1/sessions/{name}/kill"),
970 token.as_deref(),
971 )
972 .send()
973 .await
974 .map_err(|e| friendly_error(&e, node))?;
975 ok_or_api_error(resp).await?;
976 Ok(format!("Session {name} killed."))
977 }
978 Commands::Delete { name } => {
979 let resp = authed_delete(
980 &client,
981 format!("{url}/api/v1/sessions/{name}"),
982 token.as_deref(),
983 )
984 .send()
985 .await
986 .map_err(|e| friendly_error(&e, node))?;
987 ok_or_api_error(resp).await?;
988 Ok(format!("Session {name} deleted."))
989 }
990 Commands::Logs {
991 name,
992 lines,
993 follow,
994 } => {
995 if *follow {
996 let mut stdout = std::io::stdout();
997 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
998 .await
999 .map_err(|e| {
1000 match e.downcast::<reqwest::Error>() {
1002 Ok(re) => friendly_error(&re, node),
1003 Err(other) => other,
1004 }
1005 })?;
1006 Ok(String::new())
1007 } else {
1008 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1009 .await
1010 .map_err(|e| match e.downcast::<reqwest::Error>() {
1011 Ok(re) => friendly_error(&re, node),
1012 Err(other) => other,
1013 })?;
1014 Ok(output)
1015 }
1016 }
1017 Commands::Interventions { name } => {
1018 let resp = authed_get(
1019 &client,
1020 format!("{url}/api/v1/sessions/{name}/interventions"),
1021 token.as_deref(),
1022 )
1023 .send()
1024 .await
1025 .map_err(|e| friendly_error(&e, node))?;
1026 let text = ok_or_api_error(resp).await?;
1027 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1028 Ok(format_interventions(&events))
1029 }
1030 Commands::Ui => {
1031 let dashboard = base_url(&cli.node);
1032 open_browser(&dashboard)?;
1033 Ok(format!("Opening {dashboard}"))
1034 }
1035 Commands::Resume { name } => {
1036 let resp = authed_post(
1037 &client,
1038 format!("{url}/api/v1/sessions/{name}/resume"),
1039 token.as_deref(),
1040 )
1041 .send()
1042 .await
1043 .map_err(|e| friendly_error(&e, node))?;
1044 let text = ok_or_api_error(resp).await?;
1045 let session: Session = serde_json::from_str(&text)?;
1046 Ok(format!("Resumed session \"{}\"", session.name))
1047 }
1048 Commands::Knowledge {
1049 session,
1050 kind,
1051 repo,
1052 ink,
1053 limit,
1054 context,
1055 get,
1056 delete,
1057 push,
1058 } => {
1059 if let Some(id) = get {
1061 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1062 let resp = authed_get(&client, endpoint, token.as_deref())
1063 .send()
1064 .await
1065 .map_err(|e| friendly_error(&e, node))?;
1066 let text = ok_or_api_error(resp).await?;
1067 let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1068 return Ok(format_knowledge(&[resp.knowledge]));
1069 }
1070
1071 if let Some(id) = delete {
1073 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1074 let resp = authed_delete(&client, endpoint, token.as_deref())
1075 .send()
1076 .await
1077 .map_err(|e| friendly_error(&e, node))?;
1078 let text = ok_or_api_error(resp).await?;
1079 let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1080 return Ok(if resp.deleted {
1081 format!("Deleted knowledge item {id}")
1082 } else {
1083 format!("Knowledge item {id} not found")
1084 });
1085 }
1086
1087 if *push {
1089 let endpoint = format!("{url}/api/v1/knowledge/push");
1090 let resp = authed_post(&client, endpoint, token.as_deref())
1091 .send()
1092 .await
1093 .map_err(|e| friendly_error(&e, node))?;
1094 let text = ok_or_api_error(resp).await?;
1095 let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1096 return Ok(resp.message);
1097 }
1098
1099 let mut params = vec![format!("limit={limit}")];
1101 let endpoint = if *context {
1102 if let Some(r) = repo {
1103 params.push(format!("workdir={r}"));
1104 }
1105 if let Some(i) = ink {
1106 params.push(format!("ink={i}"));
1107 }
1108 format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1109 } else {
1110 if let Some(s) = session {
1111 params.push(format!("session_id={s}"));
1112 }
1113 if let Some(k) = kind {
1114 params.push(format!("kind={k}"));
1115 }
1116 if let Some(r) = repo {
1117 params.push(format!("repo={r}"));
1118 }
1119 if let Some(i) = ink {
1120 params.push(format!("ink={i}"));
1121 }
1122 format!("{url}/api/v1/knowledge?{}", params.join("&"))
1123 };
1124 let resp = authed_get(&client, endpoint, token.as_deref())
1125 .send()
1126 .await
1127 .map_err(|e| friendly_error(&e, node))?;
1128 let text = ok_or_api_error(resp).await?;
1129 let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1130 Ok(format_knowledge(&resp.knowledge))
1131 }
1132 Commands::Schedule { action } => execute_schedule(action, node),
1133 }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138 use super::*;
1139
1140 #[test]
1141 fn test_base_url() {
1142 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1143 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1144 }
1145
1146 #[test]
1147 fn test_cli_parse_list() {
1148 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1149 assert_eq!(cli.node, "localhost:7433");
1150 assert!(matches!(cli.command, Commands::List));
1151 }
1152
1153 #[test]
1154 fn test_cli_parse_nodes() {
1155 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1156 assert!(matches!(cli.command, Commands::Nodes));
1157 }
1158
1159 #[test]
1160 fn test_cli_parse_ui() {
1161 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1162 assert!(matches!(cli.command, Commands::Ui));
1163 }
1164
1165 #[test]
1166 fn test_cli_parse_ui_custom_node() {
1167 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1168 assert!(matches!(cli.command, Commands::Ui));
1169 assert_eq!(cli.node, "mac-mini:7433");
1170 }
1171
1172 #[test]
1173 fn test_build_open_command() {
1174 let cmd = build_open_command("http://localhost:7433");
1175 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1176 assert_eq!(args, vec!["http://localhost:7433"]);
1177 #[cfg(target_os = "macos")]
1178 assert_eq!(cmd.get_program(), "open");
1179 #[cfg(target_os = "linux")]
1180 assert_eq!(cmd.get_program(), "xdg-open");
1181 }
1182
1183 #[test]
1184 fn test_cli_parse_spawn() {
1185 let cli = Cli::try_parse_from([
1186 "pulpo",
1187 "spawn",
1188 "--workdir",
1189 "/tmp/repo",
1190 "Fix",
1191 "the",
1192 "bug",
1193 ])
1194 .unwrap();
1195 assert!(matches!(
1196 &cli.command,
1197 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1198 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1199 && !unrestricted && prompt == &["Fix", "the", "bug"]
1200 ));
1201 }
1202
1203 #[test]
1204 fn test_cli_parse_spawn_with_provider() {
1205 let cli = Cli::try_parse_from([
1206 "pulpo",
1207 "spawn",
1208 "--workdir",
1209 "/tmp",
1210 "--provider",
1211 "codex",
1212 "Do it",
1213 ])
1214 .unwrap();
1215 assert!(matches!(
1216 &cli.command,
1217 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1218 ));
1219 }
1220
1221 #[test]
1222 fn test_cli_parse_spawn_auto() {
1223 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1224 .unwrap();
1225 assert!(matches!(
1226 &cli.command,
1227 Commands::Spawn { auto, .. } if *auto
1228 ));
1229 }
1230
1231 #[test]
1232 fn test_cli_parse_spawn_unrestricted() {
1233 let cli = Cli::try_parse_from([
1234 "pulpo",
1235 "spawn",
1236 "--workdir",
1237 "/tmp",
1238 "--unrestricted",
1239 "Do it",
1240 ])
1241 .unwrap();
1242 assert!(matches!(
1243 &cli.command,
1244 Commands::Spawn { unrestricted, .. } if *unrestricted
1245 ));
1246 }
1247
1248 #[test]
1249 fn test_cli_parse_spawn_unrestricted_default() {
1250 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1251 assert!(matches!(
1252 &cli.command,
1253 Commands::Spawn { unrestricted, .. } if !unrestricted
1254 ));
1255 }
1256
1257 #[test]
1258 fn test_cli_parse_spawn_with_name() {
1259 let cli = Cli::try_parse_from([
1260 "pulpo",
1261 "spawn",
1262 "--workdir",
1263 "/tmp/repo",
1264 "--name",
1265 "my-task",
1266 "Fix it",
1267 ])
1268 .unwrap();
1269 assert!(matches!(
1270 &cli.command,
1271 Commands::Spawn { workdir, name, .. }
1272 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1273 ));
1274 }
1275
1276 #[test]
1277 fn test_cli_parse_spawn_without_name() {
1278 let cli =
1279 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1280 assert!(matches!(
1281 &cli.command,
1282 Commands::Spawn { name, .. } if name.is_none()
1283 ));
1284 }
1285
1286 #[test]
1287 fn test_cli_parse_logs() {
1288 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1289 assert!(matches!(
1290 &cli.command,
1291 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1292 ));
1293 }
1294
1295 #[test]
1296 fn test_cli_parse_logs_with_lines() {
1297 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1298 assert!(matches!(
1299 &cli.command,
1300 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1301 ));
1302 }
1303
1304 #[test]
1305 fn test_cli_parse_logs_follow() {
1306 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1307 assert!(matches!(
1308 &cli.command,
1309 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1310 ));
1311 }
1312
1313 #[test]
1314 fn test_cli_parse_logs_follow_short() {
1315 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1316 assert!(matches!(
1317 &cli.command,
1318 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1319 ));
1320 }
1321
1322 #[test]
1323 fn test_cli_parse_kill() {
1324 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1325 assert!(matches!(
1326 &cli.command,
1327 Commands::Kill { name } if name == "my-session"
1328 ));
1329 }
1330
1331 #[test]
1332 fn test_cli_parse_delete() {
1333 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1334 assert!(matches!(
1335 &cli.command,
1336 Commands::Delete { name } if name == "my-session"
1337 ));
1338 }
1339
1340 #[test]
1341 fn test_cli_parse_resume() {
1342 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1343 assert!(matches!(
1344 &cli.command,
1345 Commands::Resume { name } if name == "my-session"
1346 ));
1347 }
1348
1349 #[test]
1350 fn test_cli_parse_input() {
1351 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1352 assert!(matches!(
1353 &cli.command,
1354 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1355 ));
1356 }
1357
1358 #[test]
1359 fn test_cli_parse_input_no_text() {
1360 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1361 assert!(matches!(
1362 &cli.command,
1363 Commands::Input { name, text } if name == "my-session" && text.is_none()
1364 ));
1365 }
1366
1367 #[test]
1368 fn test_cli_parse_input_alias() {
1369 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1370 assert!(matches!(
1371 &cli.command,
1372 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1373 ));
1374 }
1375
1376 #[test]
1377 fn test_cli_parse_custom_node() {
1378 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1379 assert_eq!(cli.node, "win-pc:8080");
1380 }
1381
1382 #[test]
1383 fn test_cli_version() {
1384 let result = Cli::try_parse_from(["pulpo", "--version"]);
1385 let err = result.unwrap_err();
1387 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1388 }
1389
1390 #[test]
1391 fn test_cli_parse_no_subcommand_fails() {
1392 let result = Cli::try_parse_from(["pulpo"]);
1393 assert!(result.is_err());
1394 }
1395
1396 #[test]
1397 fn test_cli_debug() {
1398 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1399 let debug = format!("{cli:?}");
1400 assert!(debug.contains("List"));
1401 }
1402
1403 #[test]
1404 fn test_commands_debug() {
1405 let cmd = Commands::List;
1406 assert_eq!(format!("{cmd:?}"), "List");
1407 }
1408
1409 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1411
1412 async fn start_test_server() -> String {
1414 use axum::http::StatusCode;
1415 use axum::{
1416 Json, Router,
1417 routing::{get, post},
1418 };
1419
1420 let session_json = TEST_SESSION_JSON;
1421
1422 let app = Router::new()
1423 .route(
1424 "/api/v1/sessions",
1425 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1426 (StatusCode::CREATED, session_json.to_owned())
1427 }),
1428 )
1429 .route(
1430 "/api/v1/sessions/{id}",
1431 get(move || async move { session_json.to_owned() })
1432 .delete(|| async { StatusCode::NO_CONTENT }),
1433 )
1434 .route(
1435 "/api/v1/sessions/{id}/kill",
1436 post(|| async { StatusCode::NO_CONTENT }),
1437 )
1438 .route(
1439 "/api/v1/sessions/{id}/output",
1440 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1441 )
1442 .route(
1443 "/api/v1/peers",
1444 get(|| async {
1445 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1446 }),
1447 )
1448 .route(
1449 "/api/v1/sessions/{id}/resume",
1450 axum::routing::post(move || async move { session_json.to_owned() }),
1451 )
1452 .route(
1453 "/api/v1/sessions/{id}/interventions",
1454 get(|| async { "[]".to_owned() }),
1455 )
1456 .route(
1457 "/api/v1/sessions/{id}/input",
1458 post(|| async { StatusCode::NO_CONTENT }),
1459 );
1460
1461 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1462 let addr = listener.local_addr().unwrap();
1463 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1464 format!("127.0.0.1:{}", addr.port())
1465 }
1466
1467 #[tokio::test]
1468 async fn test_execute_list_success() {
1469 let node = start_test_server().await;
1470 let cli = Cli {
1471 node,
1472 token: None,
1473 command: Commands::List,
1474 };
1475 let result = execute(&cli).await.unwrap();
1476 assert_eq!(result, "No sessions.");
1477 }
1478
1479 #[tokio::test]
1480 async fn test_execute_nodes_success() {
1481 let node = start_test_server().await;
1482 let cli = Cli {
1483 node,
1484 token: None,
1485 command: Commands::Nodes,
1486 };
1487 let result = execute(&cli).await.unwrap();
1488 assert!(result.contains("test"));
1489 assert!(result.contains("(local)"));
1490 assert!(result.contains("NAME"));
1491 }
1492
1493 #[tokio::test]
1494 async fn test_execute_spawn_success() {
1495 let node = start_test_server().await;
1496 let cli = Cli {
1497 node,
1498 token: None,
1499 command: Commands::Spawn {
1500 workdir: Some("/tmp/repo".into()),
1501 name: None,
1502 provider: Some("claude".into()),
1503 auto: false,
1504 unrestricted: false,
1505 model: None,
1506 system_prompt: None,
1507 allowed_tools: None,
1508 ink: None,
1509 max_turns: None,
1510 max_budget: None,
1511 output_format: None,
1512 prompt: vec!["Fix".into(), "bug".into()],
1513 },
1514 };
1515 let result = execute(&cli).await.unwrap();
1516 assert!(result.contains("Created session"));
1517 assert!(result.contains("repo"));
1518 }
1519
1520 #[tokio::test]
1521 async fn test_execute_spawn_with_all_new_flags() {
1522 let node = start_test_server().await;
1523 let cli = Cli {
1524 node,
1525 token: None,
1526 command: Commands::Spawn {
1527 workdir: Some("/tmp/repo".into()),
1528 name: None,
1529 provider: Some("claude".into()),
1530 auto: false,
1531 unrestricted: false,
1532 model: Some("opus".into()),
1533 system_prompt: Some("Be helpful".into()),
1534 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1535 ink: Some("coder".into()),
1536 max_turns: Some(5),
1537 max_budget: Some(2.5),
1538 output_format: Some("json".into()),
1539 prompt: vec!["Fix".into(), "bug".into()],
1540 },
1541 };
1542 let result = execute(&cli).await.unwrap();
1543 assert!(result.contains("Created session"));
1544 }
1545
1546 #[tokio::test]
1547 async fn test_execute_spawn_auto_mode() {
1548 let node = start_test_server().await;
1549 let cli = Cli {
1550 node,
1551 token: None,
1552 command: Commands::Spawn {
1553 workdir: Some("/tmp/repo".into()),
1554 name: None,
1555 provider: Some("claude".into()),
1556 auto: true,
1557 unrestricted: false,
1558 model: None,
1559 system_prompt: None,
1560 allowed_tools: None,
1561 ink: None,
1562 max_turns: None,
1563 max_budget: None,
1564 output_format: None,
1565 prompt: vec!["Do it".into()],
1566 },
1567 };
1568 let result = execute(&cli).await.unwrap();
1569 assert!(result.contains("Created session"));
1570 }
1571
1572 #[tokio::test]
1573 async fn test_execute_spawn_with_name() {
1574 let node = start_test_server().await;
1575 let cli = Cli {
1576 node,
1577 token: None,
1578 command: Commands::Spawn {
1579 workdir: Some("/tmp/repo".into()),
1580 name: Some("my-task".into()),
1581 provider: Some("claude".into()),
1582 auto: false,
1583 unrestricted: false,
1584 model: None,
1585 system_prompt: None,
1586 allowed_tools: None,
1587 ink: None,
1588 max_turns: None,
1589 max_budget: None,
1590 output_format: None,
1591 prompt: vec!["Fix".into(), "bug".into()],
1592 },
1593 };
1594 let result = execute(&cli).await.unwrap();
1595 assert!(result.contains("Created session"));
1596 }
1597
1598 #[tokio::test]
1599 async fn test_execute_kill_success() {
1600 let node = start_test_server().await;
1601 let cli = Cli {
1602 node,
1603 token: None,
1604 command: Commands::Kill {
1605 name: "test-session".into(),
1606 },
1607 };
1608 let result = execute(&cli).await.unwrap();
1609 assert!(result.contains("killed"));
1610 }
1611
1612 #[tokio::test]
1613 async fn test_execute_delete_success() {
1614 let node = start_test_server().await;
1615 let cli = Cli {
1616 node,
1617 token: None,
1618 command: Commands::Delete {
1619 name: "test-session".into(),
1620 },
1621 };
1622 let result = execute(&cli).await.unwrap();
1623 assert!(result.contains("deleted"));
1624 }
1625
1626 #[tokio::test]
1627 async fn test_execute_logs_success() {
1628 let node = start_test_server().await;
1629 let cli = Cli {
1630 node,
1631 token: None,
1632 command: Commands::Logs {
1633 name: "test-session".into(),
1634 lines: 50,
1635 follow: false,
1636 },
1637 };
1638 let result = execute(&cli).await.unwrap();
1639 assert!(result.contains("test output"));
1640 }
1641
1642 #[tokio::test]
1643 async fn test_execute_list_connection_refused() {
1644 let cli = Cli {
1645 node: "localhost:1".into(),
1646 token: None,
1647 command: Commands::List,
1648 };
1649 let result = execute(&cli).await;
1650 let err = result.unwrap_err().to_string();
1651 assert!(
1652 err.contains("Could not connect to pulpod"),
1653 "Expected friendly error, got: {err}"
1654 );
1655 assert!(err.contains("localhost:1"));
1656 }
1657
1658 #[tokio::test]
1659 async fn test_execute_nodes_connection_refused() {
1660 let cli = Cli {
1661 node: "localhost:1".into(),
1662 token: None,
1663 command: Commands::Nodes,
1664 };
1665 let result = execute(&cli).await;
1666 let err = result.unwrap_err().to_string();
1667 assert!(err.contains("Could not connect to pulpod"));
1668 }
1669
1670 #[tokio::test]
1671 async fn test_execute_kill_error_response() {
1672 use axum::{Router, http::StatusCode, routing::post};
1673
1674 let app = Router::new().route(
1675 "/api/v1/sessions/{id}/kill",
1676 post(|| async {
1677 (
1678 StatusCode::NOT_FOUND,
1679 "{\"error\":\"session not found: test-session\"}",
1680 )
1681 }),
1682 );
1683 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1684 let addr = listener.local_addr().unwrap();
1685 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1686 let node = format!("127.0.0.1:{}", addr.port());
1687
1688 let cli = Cli {
1689 node,
1690 token: None,
1691 command: Commands::Kill {
1692 name: "test-session".into(),
1693 },
1694 };
1695 let err = execute(&cli).await.unwrap_err();
1696 assert_eq!(err.to_string(), "session not found: test-session");
1697 }
1698
1699 #[tokio::test]
1700 async fn test_execute_delete_error_response() {
1701 use axum::{Router, http::StatusCode, routing::delete};
1702
1703 let app = Router::new().route(
1704 "/api/v1/sessions/{id}",
1705 delete(|| async {
1706 (
1707 StatusCode::CONFLICT,
1708 "{\"error\":\"cannot delete session in 'running' state\"}",
1709 )
1710 }),
1711 );
1712 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1713 let addr = listener.local_addr().unwrap();
1714 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1715 let node = format!("127.0.0.1:{}", addr.port());
1716
1717 let cli = Cli {
1718 node,
1719 token: None,
1720 command: Commands::Delete {
1721 name: "test-session".into(),
1722 },
1723 };
1724 let err = execute(&cli).await.unwrap_err();
1725 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1726 }
1727
1728 #[tokio::test]
1729 async fn test_execute_logs_error_response() {
1730 use axum::{Router, http::StatusCode, routing::get};
1731
1732 let app = Router::new().route(
1733 "/api/v1/sessions/{id}/output",
1734 get(|| async {
1735 (
1736 StatusCode::NOT_FOUND,
1737 "{\"error\":\"session not found: ghost\"}",
1738 )
1739 }),
1740 );
1741 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1742 let addr = listener.local_addr().unwrap();
1743 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1744 let node = format!("127.0.0.1:{}", addr.port());
1745
1746 let cli = Cli {
1747 node,
1748 token: None,
1749 command: Commands::Logs {
1750 name: "ghost".into(),
1751 lines: 50,
1752 follow: false,
1753 },
1754 };
1755 let err = execute(&cli).await.unwrap_err();
1756 assert_eq!(err.to_string(), "session not found: ghost");
1757 }
1758
1759 #[tokio::test]
1760 async fn test_execute_resume_error_response() {
1761 use axum::{Router, http::StatusCode, routing::post};
1762
1763 let app = Router::new().route(
1764 "/api/v1/sessions/{id}/resume",
1765 post(|| async {
1766 (
1767 StatusCode::BAD_REQUEST,
1768 "{\"error\":\"session is not stale (status: running)\"}",
1769 )
1770 }),
1771 );
1772 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1773 let addr = listener.local_addr().unwrap();
1774 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1775 let node = format!("127.0.0.1:{}", addr.port());
1776
1777 let cli = Cli {
1778 node,
1779 token: None,
1780 command: Commands::Resume {
1781 name: "test-session".into(),
1782 },
1783 };
1784 let err = execute(&cli).await.unwrap_err();
1785 assert_eq!(err.to_string(), "session is not stale (status: running)");
1786 }
1787
1788 #[tokio::test]
1789 async fn test_execute_spawn_error_response() {
1790 use axum::{Router, http::StatusCode, routing::post};
1791
1792 let app = Router::new().route(
1793 "/api/v1/sessions",
1794 post(|| async {
1795 (
1796 StatusCode::INTERNAL_SERVER_ERROR,
1797 "{\"error\":\"failed to spawn session\"}",
1798 )
1799 }),
1800 );
1801 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1802 let addr = listener.local_addr().unwrap();
1803 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1804 let node = format!("127.0.0.1:{}", addr.port());
1805
1806 let cli = Cli {
1807 node,
1808 token: None,
1809 command: Commands::Spawn {
1810 workdir: Some("/tmp/repo".into()),
1811 name: None,
1812 provider: Some("claude".into()),
1813 auto: false,
1814 unrestricted: false,
1815 model: None,
1816 system_prompt: None,
1817 allowed_tools: None,
1818 ink: None,
1819 max_turns: None,
1820 max_budget: None,
1821 output_format: None,
1822 prompt: vec!["test".into()],
1823 },
1824 };
1825 let err = execute(&cli).await.unwrap_err();
1826 assert_eq!(err.to_string(), "failed to spawn session");
1827 }
1828
1829 #[tokio::test]
1830 async fn test_execute_interventions_error_response() {
1831 use axum::{Router, http::StatusCode, routing::get};
1832
1833 let app = Router::new().route(
1834 "/api/v1/sessions/{id}/interventions",
1835 get(|| async {
1836 (
1837 StatusCode::NOT_FOUND,
1838 "{\"error\":\"session not found: ghost\"}",
1839 )
1840 }),
1841 );
1842 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1843 let addr = listener.local_addr().unwrap();
1844 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1845 let node = format!("127.0.0.1:{}", addr.port());
1846
1847 let cli = Cli {
1848 node,
1849 token: None,
1850 command: Commands::Interventions {
1851 name: "ghost".into(),
1852 },
1853 };
1854 let err = execute(&cli).await.unwrap_err();
1855 assert_eq!(err.to_string(), "session not found: ghost");
1856 }
1857
1858 #[tokio::test]
1859 async fn test_execute_resume_success() {
1860 let node = start_test_server().await;
1861 let cli = Cli {
1862 node,
1863 token: None,
1864 command: Commands::Resume {
1865 name: "test-session".into(),
1866 },
1867 };
1868 let result = execute(&cli).await.unwrap();
1869 assert!(result.contains("Resumed session"));
1870 assert!(result.contains("repo"));
1871 }
1872
1873 #[tokio::test]
1874 async fn test_execute_input_success() {
1875 let node = start_test_server().await;
1876 let cli = Cli {
1877 node,
1878 token: None,
1879 command: Commands::Input {
1880 name: "test-session".into(),
1881 text: Some("yes".into()),
1882 },
1883 };
1884 let result = execute(&cli).await.unwrap();
1885 assert!(result.contains("Sent input to session test-session"));
1886 }
1887
1888 #[tokio::test]
1889 async fn test_execute_input_no_text() {
1890 let node = start_test_server().await;
1891 let cli = Cli {
1892 node,
1893 token: None,
1894 command: Commands::Input {
1895 name: "test-session".into(),
1896 text: None,
1897 },
1898 };
1899 let result = execute(&cli).await.unwrap();
1900 assert!(result.contains("Sent input to session test-session"));
1901 }
1902
1903 #[tokio::test]
1904 async fn test_execute_input_connection_refused() {
1905 let cli = Cli {
1906 node: "localhost:1".into(),
1907 token: None,
1908 command: Commands::Input {
1909 name: "test".into(),
1910 text: Some("y".into()),
1911 },
1912 };
1913 let result = execute(&cli).await;
1914 let err = result.unwrap_err().to_string();
1915 assert!(err.contains("Could not connect to pulpod"));
1916 }
1917
1918 #[tokio::test]
1919 async fn test_execute_input_error_response() {
1920 use axum::{Router, http::StatusCode, routing::post};
1921
1922 let app = Router::new().route(
1923 "/api/v1/sessions/{id}/input",
1924 post(|| async {
1925 (
1926 StatusCode::NOT_FOUND,
1927 "{\"error\":\"session not found: ghost\"}",
1928 )
1929 }),
1930 );
1931 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1932 let addr = listener.local_addr().unwrap();
1933 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1934 let node = format!("127.0.0.1:{}", addr.port());
1935
1936 let cli = Cli {
1937 node,
1938 token: None,
1939 command: Commands::Input {
1940 name: "ghost".into(),
1941 text: Some("y".into()),
1942 },
1943 };
1944 let err = execute(&cli).await.unwrap_err();
1945 assert_eq!(err.to_string(), "session not found: ghost");
1946 }
1947
1948 #[tokio::test]
1949 async fn test_execute_ui() {
1950 let cli = Cli {
1951 node: "localhost:7433".into(),
1952 token: None,
1953 command: Commands::Ui,
1954 };
1955 let result = execute(&cli).await.unwrap();
1956 assert!(result.contains("Opening"));
1957 assert!(result.contains("http://localhost:7433"));
1958 }
1959
1960 #[tokio::test]
1961 async fn test_execute_ui_custom_node() {
1962 let cli = Cli {
1963 node: "mac-mini:7433".into(),
1964 token: None,
1965 command: Commands::Ui,
1966 };
1967 let result = execute(&cli).await.unwrap();
1968 assert!(result.contains("http://mac-mini:7433"));
1969 }
1970
1971 #[test]
1972 fn test_format_sessions_empty() {
1973 assert_eq!(format_sessions(&[]), "No sessions.");
1974 }
1975
1976 #[test]
1977 fn test_format_sessions_with_data() {
1978 use chrono::Utc;
1979 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1980 use uuid::Uuid;
1981
1982 let sessions = vec![Session {
1983 id: Uuid::nil(),
1984 name: "my-api".into(),
1985 workdir: "/tmp/repo".into(),
1986 provider: Provider::Claude,
1987 prompt: "Fix the bug".into(),
1988 status: SessionStatus::Running,
1989 mode: SessionMode::Interactive,
1990 conversation_id: None,
1991 exit_code: None,
1992 backend_session_id: None,
1993 output_snapshot: None,
1994 guard_config: None,
1995 model: None,
1996 allowed_tools: None,
1997 system_prompt: None,
1998 metadata: None,
1999 ink: None,
2000 max_turns: None,
2001 max_budget_usd: None,
2002 output_format: None,
2003 intervention_reason: None,
2004 intervention_at: None,
2005 last_output_at: None,
2006 idle_since: None,
2007 waiting_for_input: false,
2008 created_at: Utc::now(),
2009 updated_at: Utc::now(),
2010 }];
2011 let output = format_sessions(&sessions);
2012 assert!(output.contains("NAME"));
2013 assert!(output.contains("my-api"));
2014 assert!(output.contains("running"));
2015 assert!(output.contains("claude"));
2016 assert!(output.contains("Fix the bug"));
2017 }
2018
2019 #[test]
2020 fn test_format_sessions_long_prompt_truncated() {
2021 use chrono::Utc;
2022 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2023 use uuid::Uuid;
2024
2025 let sessions = vec![Session {
2026 id: Uuid::nil(),
2027 name: "test".into(),
2028 workdir: "/tmp".into(),
2029 provider: Provider::Codex,
2030 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2031 status: SessionStatus::Completed,
2032 mode: SessionMode::Autonomous,
2033 conversation_id: None,
2034 exit_code: None,
2035 backend_session_id: None,
2036 output_snapshot: None,
2037 guard_config: None,
2038 model: None,
2039 allowed_tools: None,
2040 system_prompt: None,
2041 metadata: None,
2042 ink: None,
2043 max_turns: None,
2044 max_budget_usd: None,
2045 output_format: None,
2046 intervention_reason: None,
2047 intervention_at: None,
2048 last_output_at: None,
2049 idle_since: None,
2050 waiting_for_input: false,
2051 created_at: Utc::now(),
2052 updated_at: Utc::now(),
2053 }];
2054 let output = format_sessions(&sessions);
2055 assert!(output.contains("..."));
2056 }
2057
2058 #[test]
2059 fn test_format_sessions_waiting_for_input() {
2060 use chrono::Utc;
2061 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2062 use uuid::Uuid;
2063
2064 let sessions = vec![Session {
2065 id: Uuid::nil(),
2066 name: "blocked".into(),
2067 workdir: "/tmp".into(),
2068 provider: Provider::Claude,
2069 prompt: "Fix bug".into(),
2070 status: SessionStatus::Running,
2071 mode: SessionMode::Interactive,
2072 conversation_id: None,
2073 exit_code: None,
2074 backend_session_id: None,
2075 output_snapshot: None,
2076 guard_config: None,
2077 model: None,
2078 allowed_tools: None,
2079 system_prompt: None,
2080 metadata: None,
2081 ink: None,
2082 max_turns: None,
2083 max_budget_usd: None,
2084 output_format: None,
2085 intervention_reason: None,
2086 intervention_at: None,
2087 last_output_at: None,
2088 idle_since: None,
2089 waiting_for_input: true,
2090 created_at: Utc::now(),
2091 updated_at: Utc::now(),
2092 }];
2093 let output = format_sessions(&sessions);
2094 assert!(output.contains("waiting"));
2095 assert!(!output.contains("running"));
2096 }
2097
2098 #[test]
2099 fn test_format_nodes() {
2100 use pulpo_common::node::NodeInfo;
2101 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2102
2103 let resp = PeersResponse {
2104 local: NodeInfo {
2105 name: "mac-mini".into(),
2106 hostname: "h".into(),
2107 os: "macos".into(),
2108 arch: "arm64".into(),
2109 cpus: 8,
2110 memory_mb: 16384,
2111 gpu: None,
2112 },
2113 peers: vec![PeerInfo {
2114 name: "win-pc".into(),
2115 address: "win-pc:7433".into(),
2116 status: PeerStatus::Online,
2117 node_info: None,
2118 session_count: Some(3),
2119 source: PeerSource::Configured,
2120 }],
2121 };
2122 let output = format_nodes(&resp);
2123 assert!(output.contains("mac-mini"));
2124 assert!(output.contains("(local)"));
2125 assert!(output.contains("win-pc"));
2126 assert!(output.contains('3'));
2127 }
2128
2129 #[test]
2130 fn test_format_nodes_no_session_count() {
2131 use pulpo_common::node::NodeInfo;
2132 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2133
2134 let resp = PeersResponse {
2135 local: NodeInfo {
2136 name: "local".into(),
2137 hostname: "h".into(),
2138 os: "linux".into(),
2139 arch: "x86_64".into(),
2140 cpus: 4,
2141 memory_mb: 8192,
2142 gpu: None,
2143 },
2144 peers: vec![PeerInfo {
2145 name: "peer".into(),
2146 address: "peer:7433".into(),
2147 status: PeerStatus::Offline,
2148 node_info: None,
2149 session_count: None,
2150 source: PeerSource::Configured,
2151 }],
2152 };
2153 let output = format_nodes(&resp);
2154 assert!(output.contains("offline"));
2155 let lines: Vec<&str> = output.lines().collect();
2157 assert!(lines[2].contains('-'));
2158 }
2159
2160 #[tokio::test]
2161 async fn test_execute_resume_connection_refused() {
2162 let cli = Cli {
2163 node: "localhost:1".into(),
2164 token: None,
2165 command: Commands::Resume {
2166 name: "test".into(),
2167 },
2168 };
2169 let result = execute(&cli).await;
2170 let err = result.unwrap_err().to_string();
2171 assert!(err.contains("Could not connect to pulpod"));
2172 }
2173
2174 #[tokio::test]
2175 async fn test_execute_spawn_connection_refused() {
2176 let cli = Cli {
2177 node: "localhost:1".into(),
2178 token: None,
2179 command: Commands::Spawn {
2180 workdir: Some("/tmp".into()),
2181 name: None,
2182 provider: Some("claude".into()),
2183 auto: false,
2184 unrestricted: false,
2185 model: None,
2186 system_prompt: None,
2187 allowed_tools: None,
2188 ink: None,
2189 max_turns: None,
2190 max_budget: None,
2191 output_format: None,
2192 prompt: vec!["test".into()],
2193 },
2194 };
2195 let result = execute(&cli).await;
2196 let err = result.unwrap_err().to_string();
2197 assert!(err.contains("Could not connect to pulpod"));
2198 }
2199
2200 #[tokio::test]
2201 async fn test_execute_kill_connection_refused() {
2202 let cli = Cli {
2203 node: "localhost:1".into(),
2204 token: None,
2205 command: Commands::Kill {
2206 name: "test".into(),
2207 },
2208 };
2209 let result = execute(&cli).await;
2210 let err = result.unwrap_err().to_string();
2211 assert!(err.contains("Could not connect to pulpod"));
2212 }
2213
2214 #[tokio::test]
2215 async fn test_execute_delete_connection_refused() {
2216 let cli = Cli {
2217 node: "localhost:1".into(),
2218 token: None,
2219 command: Commands::Delete {
2220 name: "test".into(),
2221 },
2222 };
2223 let result = execute(&cli).await;
2224 let err = result.unwrap_err().to_string();
2225 assert!(err.contains("Could not connect to pulpod"));
2226 }
2227
2228 #[tokio::test]
2229 async fn test_execute_logs_connection_refused() {
2230 let cli = Cli {
2231 node: "localhost:1".into(),
2232 token: None,
2233 command: Commands::Logs {
2234 name: "test".into(),
2235 lines: 50,
2236 follow: false,
2237 },
2238 };
2239 let result = execute(&cli).await;
2240 let err = result.unwrap_err().to_string();
2241 assert!(err.contains("Could not connect to pulpod"));
2242 }
2243
2244 #[tokio::test]
2245 async fn test_friendly_error_connect() {
2246 let err = reqwest::Client::new()
2248 .get("http://127.0.0.1:1")
2249 .send()
2250 .await
2251 .unwrap_err();
2252 let friendly = friendly_error(&err, "test-node:1");
2253 let msg = friendly.to_string();
2254 assert!(
2255 msg.contains("Could not connect"),
2256 "Expected connect message, got: {msg}"
2257 );
2258 }
2259
2260 #[tokio::test]
2261 async fn test_friendly_error_other() {
2262 let err = reqwest::Client::new()
2264 .get("http://[::invalid::url")
2265 .send()
2266 .await
2267 .unwrap_err();
2268 let friendly = friendly_error(&err, "bad-host");
2269 let msg = friendly.to_string();
2270 assert!(
2271 msg.contains("Network error"),
2272 "Expected network error message, got: {msg}"
2273 );
2274 assert!(msg.contains("bad-host"));
2275 }
2276
2277 #[test]
2280 fn test_is_localhost_variants() {
2281 assert!(is_localhost("localhost:7433"));
2282 assert!(is_localhost("127.0.0.1:7433"));
2283 assert!(is_localhost("[::1]:7433"));
2284 assert!(is_localhost("::1"));
2285 assert!(is_localhost("localhost"));
2286 assert!(!is_localhost("mac-mini:7433"));
2287 assert!(!is_localhost("192.168.1.100:7433"));
2288 }
2289
2290 #[test]
2291 fn test_authed_get_with_token() {
2292 let client = reqwest::Client::new();
2293 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2294 .build()
2295 .unwrap();
2296 let auth = req
2297 .headers()
2298 .get("authorization")
2299 .unwrap()
2300 .to_str()
2301 .unwrap();
2302 assert_eq!(auth, "Bearer tok");
2303 }
2304
2305 #[test]
2306 fn test_authed_get_without_token() {
2307 let client = reqwest::Client::new();
2308 let req = authed_get(&client, "http://h:1/api".into(), None)
2309 .build()
2310 .unwrap();
2311 assert!(req.headers().get("authorization").is_none());
2312 }
2313
2314 #[test]
2315 fn test_authed_post_with_token() {
2316 let client = reqwest::Client::new();
2317 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2318 .build()
2319 .unwrap();
2320 let auth = req
2321 .headers()
2322 .get("authorization")
2323 .unwrap()
2324 .to_str()
2325 .unwrap();
2326 assert_eq!(auth, "Bearer secret");
2327 }
2328
2329 #[test]
2330 fn test_authed_post_without_token() {
2331 let client = reqwest::Client::new();
2332 let req = authed_post(&client, "http://h:1/api".into(), None)
2333 .build()
2334 .unwrap();
2335 assert!(req.headers().get("authorization").is_none());
2336 }
2337
2338 #[test]
2339 fn test_authed_delete_with_token() {
2340 let client = reqwest::Client::new();
2341 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2342 .build()
2343 .unwrap();
2344 let auth = req
2345 .headers()
2346 .get("authorization")
2347 .unwrap()
2348 .to_str()
2349 .unwrap();
2350 assert_eq!(auth, "Bearer del-tok");
2351 }
2352
2353 #[test]
2354 fn test_authed_delete_without_token() {
2355 let client = reqwest::Client::new();
2356 let req = authed_delete(&client, "http://h:1/api".into(), None)
2357 .build()
2358 .unwrap();
2359 assert!(req.headers().get("authorization").is_none());
2360 }
2361
2362 #[tokio::test]
2363 async fn test_resolve_token_explicit() {
2364 let client = reqwest::Client::new();
2365 let token =
2366 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2367 assert_eq!(token, Some("my-tok".into()));
2368 }
2369
2370 #[tokio::test]
2371 async fn test_resolve_token_remote_no_explicit() {
2372 let client = reqwest::Client::new();
2373 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2374 assert_eq!(token, None);
2375 }
2376
2377 #[tokio::test]
2378 async fn test_resolve_token_localhost_auto_discover() {
2379 use axum::{Json, Router, routing::get};
2380
2381 let app = Router::new().route(
2382 "/api/v1/auth/token",
2383 get(|| async {
2384 Json(AuthTokenResponse {
2385 token: "discovered".into(),
2386 })
2387 }),
2388 );
2389 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2390 let addr = listener.local_addr().unwrap();
2391 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2392
2393 let node = format!("localhost:{}", addr.port());
2394 let base = base_url(&node);
2395 let client = reqwest::Client::new();
2396 let token = resolve_token(&client, &base, &node, None).await;
2397 assert_eq!(token, Some("discovered".into()));
2398 }
2399
2400 #[tokio::test]
2401 async fn test_discover_token_empty_returns_none() {
2402 use axum::{Json, Router, routing::get};
2403
2404 let app = Router::new().route(
2405 "/api/v1/auth/token",
2406 get(|| async {
2407 Json(AuthTokenResponse {
2408 token: String::new(),
2409 })
2410 }),
2411 );
2412 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2413 let addr = listener.local_addr().unwrap();
2414 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2415
2416 let base = format!("http://127.0.0.1:{}", addr.port());
2417 let client = reqwest::Client::new();
2418 assert_eq!(discover_token(&client, &base).await, None);
2419 }
2420
2421 #[tokio::test]
2422 async fn test_discover_token_unreachable_returns_none() {
2423 let client = reqwest::Client::new();
2424 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2425 }
2426
2427 #[test]
2428 fn test_cli_parse_with_token() {
2429 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2430 assert_eq!(cli.token, Some("my-secret".into()));
2431 }
2432
2433 #[test]
2434 fn test_cli_parse_without_token() {
2435 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2436 assert_eq!(cli.token, None);
2437 }
2438
2439 #[tokio::test]
2440 async fn test_execute_with_explicit_token_sends_header() {
2441 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2442
2443 let app = Router::new().route(
2444 "/api/v1/sessions",
2445 get(|req: Request| async move {
2446 let auth = req
2447 .headers()
2448 .get("authorization")
2449 .and_then(|v| v.to_str().ok())
2450 .unwrap_or("");
2451 assert_eq!(auth, "Bearer test-token");
2452 (StatusCode::OK, "[]".to_owned())
2453 }),
2454 );
2455 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2456 let addr = listener.local_addr().unwrap();
2457 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2458 let node = format!("127.0.0.1:{}", addr.port());
2459
2460 let cli = Cli {
2461 node,
2462 token: Some("test-token".into()),
2463 command: Commands::List,
2464 };
2465 let result = execute(&cli).await.unwrap();
2466 assert_eq!(result, "No sessions.");
2467 }
2468
2469 #[test]
2472 fn test_cli_parse_interventions() {
2473 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2474 assert!(matches!(
2475 &cli.command,
2476 Commands::Interventions { name } if name == "my-session"
2477 ));
2478 }
2479
2480 #[test]
2481 fn test_format_interventions_empty() {
2482 assert_eq!(format_interventions(&[]), "No intervention events.");
2483 }
2484
2485 #[test]
2486 fn test_format_interventions_with_data() {
2487 let events = vec![
2488 InterventionEventResponse {
2489 id: 1,
2490 session_id: "sess-1".into(),
2491 reason: "Memory exceeded threshold".into(),
2492 created_at: "2026-01-01T00:00:00Z".into(),
2493 },
2494 InterventionEventResponse {
2495 id: 2,
2496 session_id: "sess-1".into(),
2497 reason: "Idle for 10 minutes".into(),
2498 created_at: "2026-01-02T00:00:00Z".into(),
2499 },
2500 ];
2501 let output = format_interventions(&events);
2502 assert!(output.contains("ID"));
2503 assert!(output.contains("TIMESTAMP"));
2504 assert!(output.contains("REASON"));
2505 assert!(output.contains("Memory exceeded threshold"));
2506 assert!(output.contains("Idle for 10 minutes"));
2507 assert!(output.contains("2026-01-01T00:00:00Z"));
2508 }
2509
2510 #[tokio::test]
2511 async fn test_execute_interventions_empty() {
2512 let node = start_test_server().await;
2513 let cli = Cli {
2514 node,
2515 token: None,
2516 command: Commands::Interventions {
2517 name: "my-session".into(),
2518 },
2519 };
2520 let result = execute(&cli).await.unwrap();
2521 assert_eq!(result, "No intervention events.");
2522 }
2523
2524 #[tokio::test]
2525 async fn test_execute_interventions_with_data() {
2526 use axum::{Router, routing::get};
2527
2528 let app = Router::new().route(
2529 "/api/v1/sessions/{id}/interventions",
2530 get(|| async {
2531 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2532 .to_owned()
2533 }),
2534 );
2535 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2536 let addr = listener.local_addr().unwrap();
2537 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2538 let node = format!("127.0.0.1:{}", addr.port());
2539
2540 let cli = Cli {
2541 node,
2542 token: None,
2543 command: Commands::Interventions {
2544 name: "test".into(),
2545 },
2546 };
2547 let result = execute(&cli).await.unwrap();
2548 assert!(result.contains("OOM"));
2549 assert!(result.contains("2026-01-01T00:00:00Z"));
2550 }
2551
2552 #[tokio::test]
2553 async fn test_execute_interventions_connection_refused() {
2554 let cli = Cli {
2555 node: "localhost:1".into(),
2556 token: None,
2557 command: Commands::Interventions {
2558 name: "test".into(),
2559 },
2560 };
2561 let result = execute(&cli).await;
2562 let err = result.unwrap_err().to_string();
2563 assert!(err.contains("Could not connect to pulpod"));
2564 }
2565
2566 #[test]
2569 fn test_build_attach_command() {
2570 let cmd = build_attach_command("pulpo-my-session");
2571 assert_eq!(cmd.get_program(), "tmux");
2572 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2573 assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2574 }
2575
2576 #[test]
2577 fn test_cli_parse_attach() {
2578 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2579 assert!(matches!(
2580 &cli.command,
2581 Commands::Attach { name } if name == "my-session"
2582 ));
2583 }
2584
2585 #[test]
2586 fn test_cli_parse_attach_alias() {
2587 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2588 assert!(matches!(
2589 &cli.command,
2590 Commands::Attach { name } if name == "my-session"
2591 ));
2592 }
2593
2594 #[tokio::test]
2595 async fn test_execute_attach_success() {
2596 let node = start_test_server().await;
2597 let cli = Cli {
2598 node,
2599 token: None,
2600 command: Commands::Attach {
2601 name: "test-session".into(),
2602 },
2603 };
2604 let result = execute(&cli).await.unwrap();
2605 assert!(result.contains("Detached from session test-session"));
2606 }
2607
2608 #[tokio::test]
2609 async fn test_execute_attach_with_backend_session_id() {
2610 use axum::{Router, routing::get};
2611 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2612 let app = Router::new().route(
2613 "/api/v1/sessions/{id}",
2614 get(move || async move { session_json.to_owned() }),
2615 );
2616 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2617 let addr = listener.local_addr().unwrap();
2618 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2619
2620 let cli = Cli {
2621 node: format!("127.0.0.1:{}", addr.port()),
2622 token: None,
2623 command: Commands::Attach {
2624 name: "my-session".into(),
2625 },
2626 };
2627 let result = execute(&cli).await.unwrap();
2628 assert!(result.contains("Detached from session my-session"));
2629 }
2630
2631 #[tokio::test]
2632 async fn test_execute_attach_connection_refused() {
2633 let cli = Cli {
2634 node: "localhost:1".into(),
2635 token: None,
2636 command: Commands::Attach {
2637 name: "test-session".into(),
2638 },
2639 };
2640 let result = execute(&cli).await;
2641 let err = result.unwrap_err().to_string();
2642 assert!(err.contains("Could not connect to pulpod"));
2643 }
2644
2645 #[tokio::test]
2646 async fn test_execute_attach_error_response() {
2647 use axum::{Router, http::StatusCode, routing::get};
2648 let app = Router::new().route(
2649 "/api/v1/sessions/{id}",
2650 get(|| async {
2651 (
2652 StatusCode::NOT_FOUND,
2653 r#"{"error":"session not found"}"#.to_owned(),
2654 )
2655 }),
2656 );
2657 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2658 let addr = listener.local_addr().unwrap();
2659 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2660
2661 let cli = Cli {
2662 node: format!("127.0.0.1:{}", addr.port()),
2663 token: None,
2664 command: Commands::Attach {
2665 name: "nonexistent".into(),
2666 },
2667 };
2668 let result = execute(&cli).await;
2669 let err = result.unwrap_err().to_string();
2670 assert!(err.contains("session not found"));
2671 }
2672
2673 #[test]
2676 fn test_cli_parse_alias_spawn() {
2677 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2678 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2679 }
2680
2681 #[test]
2682 fn test_cli_parse_alias_list() {
2683 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2684 assert!(matches!(cli.command, Commands::List));
2685 }
2686
2687 #[test]
2688 fn test_cli_parse_alias_logs() {
2689 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2690 assert!(matches!(
2691 &cli.command,
2692 Commands::Logs { name, .. } if name == "my-session"
2693 ));
2694 }
2695
2696 #[test]
2697 fn test_cli_parse_alias_kill() {
2698 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2699 assert!(matches!(
2700 &cli.command,
2701 Commands::Kill { name } if name == "my-session"
2702 ));
2703 }
2704
2705 #[test]
2706 fn test_cli_parse_alias_delete() {
2707 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2708 assert!(matches!(
2709 &cli.command,
2710 Commands::Delete { name } if name == "my-session"
2711 ));
2712 }
2713
2714 #[test]
2715 fn test_cli_parse_alias_resume() {
2716 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2717 assert!(matches!(
2718 &cli.command,
2719 Commands::Resume { name } if name == "my-session"
2720 ));
2721 }
2722
2723 #[test]
2724 fn test_cli_parse_alias_nodes() {
2725 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2726 assert!(matches!(cli.command, Commands::Nodes));
2727 }
2728
2729 #[test]
2730 fn test_cli_parse_alias_interventions() {
2731 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2732 assert!(matches!(
2733 &cli.command,
2734 Commands::Interventions { name } if name == "my-session"
2735 ));
2736 }
2737
2738 #[test]
2739 fn test_api_error_json() {
2740 let err = api_error("{\"error\":\"session not found: foo\"}");
2741 assert_eq!(err.to_string(), "session not found: foo");
2742 }
2743
2744 #[test]
2745 fn test_api_error_plain_text() {
2746 let err = api_error("plain text error");
2747 assert_eq!(err.to_string(), "plain text error");
2748 }
2749
2750 #[test]
2753 fn test_diff_output_empty_prev() {
2754 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2755 }
2756
2757 #[test]
2758 fn test_diff_output_identical() {
2759 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2760 }
2761
2762 #[test]
2763 fn test_diff_output_new_lines_appended() {
2764 let prev = "line1\nline2";
2765 let new = "line1\nline2\nline3\nline4";
2766 assert_eq!(diff_output(prev, new), "line3\nline4");
2767 }
2768
2769 #[test]
2770 fn test_diff_output_scrolled_window() {
2771 let prev = "line1\nline2\nline3";
2773 let new = "line2\nline3\nline4";
2774 assert_eq!(diff_output(prev, new), "line4");
2775 }
2776
2777 #[test]
2778 fn test_diff_output_completely_different() {
2779 let prev = "aaa\nbbb";
2780 let new = "xxx\nyyy";
2781 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2782 }
2783
2784 #[test]
2785 fn test_diff_output_last_line_matches_but_overlap_fails() {
2786 let prev = "aaa\ncommon";
2788 let new = "zzz\ncommon\nnew_line";
2789 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2793 }
2794
2795 #[test]
2796 fn test_diff_output_new_empty() {
2797 assert_eq!(diff_output("line1", ""), "");
2798 }
2799
2800 async fn start_follow_test_server() -> String {
2804 use axum::{Router, extract::Path, extract::Query, routing::get};
2805 use std::sync::Arc;
2806 use std::sync::atomic::{AtomicUsize, Ordering};
2807
2808 let call_count = Arc::new(AtomicUsize::new(0));
2809 let output_count = call_count.clone();
2810 let status_count = Arc::new(AtomicUsize::new(0));
2811 let status_count_inner = status_count.clone();
2812
2813 let app = Router::new()
2814 .route(
2815 "/api/v1/sessions/{id}/output",
2816 get(
2817 move |_path: Path<String>,
2818 _query: Query<std::collections::HashMap<String, String>>| {
2819 let count = output_count.clone();
2820 async move {
2821 let n = count.fetch_add(1, Ordering::SeqCst);
2822 let output = match n {
2823 0 => "line1\nline2".to_owned(),
2824 1 => "line1\nline2\nline3".to_owned(),
2825 _ => "line2\nline3\nline4".to_owned(),
2826 };
2827 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2828 }
2829 },
2830 ),
2831 )
2832 .route(
2833 "/api/v1/sessions/{id}",
2834 get(move |_path: Path<String>| {
2835 let count = status_count_inner.clone();
2836 async move {
2837 let n = count.fetch_add(1, Ordering::SeqCst);
2838 let status = if n < 2 { "running" } else { "completed" };
2839 format!(
2840 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2841 )
2842 }
2843 }),
2844 );
2845
2846 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2847 let addr = listener.local_addr().unwrap();
2848 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2849 format!("http://127.0.0.1:{}", addr.port())
2850 }
2851
2852 #[tokio::test]
2853 async fn test_follow_logs_polls_and_exits_on_completed() {
2854 let base = start_follow_test_server().await;
2855 let client = reqwest::Client::new();
2856 let mut buf = Vec::new();
2857
2858 follow_logs(&client, &base, "test", 100, None, &mut buf)
2859 .await
2860 .unwrap();
2861
2862 let output = String::from_utf8(buf).unwrap();
2863 assert!(output.contains("line1"));
2865 assert!(output.contains("line2"));
2866 assert!(output.contains("line3"));
2867 assert!(output.contains("line4"));
2868 }
2869
2870 #[tokio::test]
2871 async fn test_execute_logs_follow_success() {
2872 let base = start_follow_test_server().await;
2873 let node = base.strip_prefix("http://").unwrap().to_owned();
2875
2876 let cli = Cli {
2877 node,
2878 token: None,
2879 command: Commands::Logs {
2880 name: "test".into(),
2881 lines: 100,
2882 follow: true,
2883 },
2884 };
2885 let result = execute(&cli).await.unwrap();
2887 assert_eq!(result, "");
2888 }
2889
2890 #[tokio::test]
2891 async fn test_execute_logs_follow_connection_refused() {
2892 let cli = Cli {
2893 node: "localhost:1".into(),
2894 token: None,
2895 command: Commands::Logs {
2896 name: "test".into(),
2897 lines: 50,
2898 follow: true,
2899 },
2900 };
2901 let result = execute(&cli).await;
2902 let err = result.unwrap_err().to_string();
2903 assert!(
2904 err.contains("Could not connect to pulpod"),
2905 "Expected friendly error, got: {err}"
2906 );
2907 }
2908
2909 #[tokio::test]
2910 async fn test_follow_logs_exits_on_dead() {
2911 use axum::{Router, extract::Path, extract::Query, routing::get};
2912
2913 let app = Router::new()
2914 .route(
2915 "/api/v1/sessions/{id}/output",
2916 get(
2917 |_path: Path<String>,
2918 _query: Query<std::collections::HashMap<String, String>>| async {
2919 r#"{"output":"some output"}"#.to_owned()
2920 },
2921 ),
2922 )
2923 .route(
2924 "/api/v1/sessions/{id}",
2925 get(|_path: Path<String>| async {
2926 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2927 }),
2928 );
2929
2930 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2931 let addr = listener.local_addr().unwrap();
2932 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2933 let base = format!("http://127.0.0.1:{}", addr.port());
2934
2935 let client = reqwest::Client::new();
2936 let mut buf = Vec::new();
2937 follow_logs(&client, &base, "test", 100, None, &mut buf)
2938 .await
2939 .unwrap();
2940
2941 let output = String::from_utf8(buf).unwrap();
2942 assert!(output.contains("some output"));
2943 }
2944
2945 #[tokio::test]
2946 async fn test_follow_logs_exits_on_stale() {
2947 use axum::{Router, extract::Path, extract::Query, routing::get};
2948
2949 let app = Router::new()
2950 .route(
2951 "/api/v1/sessions/{id}/output",
2952 get(
2953 |_path: Path<String>,
2954 _query: Query<std::collections::HashMap<String, String>>| async {
2955 r#"{"output":"stale output"}"#.to_owned()
2956 },
2957 ),
2958 )
2959 .route(
2960 "/api/v1/sessions/{id}",
2961 get(|_path: Path<String>| async {
2962 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2963 }),
2964 );
2965
2966 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2967 let addr = listener.local_addr().unwrap();
2968 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2969 let base = format!("http://127.0.0.1:{}", addr.port());
2970
2971 let client = reqwest::Client::new();
2972 let mut buf = Vec::new();
2973 follow_logs(&client, &base, "test", 100, None, &mut buf)
2974 .await
2975 .unwrap();
2976
2977 let output = String::from_utf8(buf).unwrap();
2978 assert!(output.contains("stale output"));
2979 }
2980
2981 #[tokio::test]
2982 async fn test_execute_logs_follow_non_reqwest_error() {
2983 use axum::{Router, extract::Path, extract::Query, routing::get};
2984
2985 let app = Router::new()
2987 .route(
2988 "/api/v1/sessions/{id}/output",
2989 get(
2990 |_path: Path<String>,
2991 _query: Query<std::collections::HashMap<String, String>>| async {
2992 r#"{"output":"initial"}"#.to_owned()
2993 },
2994 ),
2995 )
2996 .route(
2997 "/api/v1/sessions/{id}",
2998 get(|_path: Path<String>| async { "not valid json".to_owned() }),
2999 );
3000
3001 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3002 let addr = listener.local_addr().unwrap();
3003 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3004 let node = format!("127.0.0.1:{}", addr.port());
3005
3006 let cli = Cli {
3007 node,
3008 token: None,
3009 command: Commands::Logs {
3010 name: "test".into(),
3011 lines: 100,
3012 follow: true,
3013 },
3014 };
3015 let err = execute(&cli).await.unwrap_err();
3016 let msg = err.to_string();
3018 assert!(
3019 msg.contains("expected ident"),
3020 "Expected serde parse error, got: {msg}"
3021 );
3022 }
3023
3024 #[test]
3025 fn test_cli_parse_spawn_with_guardrails() {
3026 let cli = Cli::try_parse_from([
3027 "pulpo",
3028 "spawn",
3029 "--workdir",
3030 "/tmp",
3031 "--max-turns",
3032 "10",
3033 "--max-budget",
3034 "5.5",
3035 "--output-format",
3036 "json",
3037 "Do it",
3038 ])
3039 .unwrap();
3040 assert!(matches!(
3041 &cli.command,
3042 Commands::Spawn { max_turns, max_budget, output_format, .. }
3043 if *max_turns == Some(10) && *max_budget == Some(5.5)
3044 && output_format.as_deref() == Some("json")
3045 ));
3046 }
3047
3048 #[tokio::test]
3049 async fn test_fetch_session_status_connection_error() {
3050 let client = reqwest::Client::new();
3051 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3052 assert!(result.is_err());
3053 }
3054
3055 #[test]
3058 fn test_build_crontab_line() {
3059 let line = build_crontab_line(
3060 "nightly-review",
3061 "0 3 * * *",
3062 "/home/me/repo",
3063 "claude",
3064 "Review PRs",
3065 "localhost:7433",
3066 );
3067 assert_eq!(
3068 line,
3069 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3070 );
3071 }
3072
3073 #[test]
3074 fn test_crontab_install_success() {
3075 let crontab = "# existing cron\n0 * * * * echo hi\n";
3076 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3077 let result = crontab_install(crontab, "my-job", line).unwrap();
3078 assert!(result.starts_with("# existing cron\n"));
3079 assert!(result.ends_with("#pulpo:my-job\n"));
3080 assert!(result.contains("echo hi"));
3081 }
3082
3083 #[test]
3084 fn test_crontab_install_duplicate_error() {
3085 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3086 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3087 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3088 assert!(err.to_string().contains("already exists"));
3089 }
3090
3091 #[test]
3092 fn test_crontab_list_empty() {
3093 assert_eq!(crontab_list(""), "No pulpo schedules.");
3094 }
3095
3096 #[test]
3097 fn test_crontab_list_no_pulpo_entries() {
3098 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3099 }
3100
3101 #[test]
3102 fn test_crontab_list_with_entries() {
3103 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3104 let output = crontab_list(crontab);
3105 assert!(output.contains("NAME"));
3106 assert!(output.contains("CRON"));
3107 assert!(output.contains("PAUSED"));
3108 assert!(output.contains("nightly"));
3109 assert!(output.contains("0 3 * * *"));
3110 assert!(output.contains("no"));
3111 }
3112
3113 #[test]
3114 fn test_crontab_list_paused_entry() {
3115 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3116 let output = crontab_list(crontab);
3117 assert!(output.contains("paused-job"));
3118 assert!(output.contains("yes"));
3119 }
3120
3121 #[test]
3122 fn test_crontab_list_short_line() {
3123 let crontab = "badcron #pulpo:broken\n";
3125 let output = crontab_list(crontab);
3126 assert!(output.contains("broken"));
3127 assert!(output.contains('?'));
3128 }
3129
3130 #[test]
3131 fn test_crontab_remove_success() {
3132 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3133 let result = crontab_remove(crontab, "my-job").unwrap();
3134 assert!(result.contains("echo hi"));
3135 assert!(!result.contains("my-job"));
3136 }
3137
3138 #[test]
3139 fn test_crontab_remove_not_found() {
3140 let crontab = "0 * * * * echo hi\n";
3141 let err = crontab_remove(crontab, "ghost").unwrap_err();
3142 assert!(err.to_string().contains("not found"));
3143 }
3144
3145 #[test]
3146 fn test_crontab_pause_success() {
3147 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3148 let result = crontab_pause(crontab, "my-job").unwrap();
3149 assert!(result.starts_with('#'));
3150 assert!(result.contains("#pulpo:my-job"));
3151 }
3152
3153 #[test]
3154 fn test_crontab_pause_not_found() {
3155 let crontab = "0 * * * * echo hi\n";
3156 let err = crontab_pause(crontab, "ghost").unwrap_err();
3157 assert!(err.to_string().contains("not found or already paused"));
3158 }
3159
3160 #[test]
3161 fn test_crontab_pause_already_paused() {
3162 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3163 let err = crontab_pause(crontab, "my-job").unwrap_err();
3164 assert!(err.to_string().contains("already paused"));
3165 }
3166
3167 #[test]
3168 fn test_crontab_resume_success() {
3169 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3170 let result = crontab_resume(crontab, "my-job").unwrap();
3171 assert!(!result.starts_with('#'));
3172 assert!(result.contains("#pulpo:my-job"));
3173 }
3174
3175 #[test]
3176 fn test_crontab_resume_not_found() {
3177 let crontab = "0 * * * * echo hi\n";
3178 let err = crontab_resume(crontab, "ghost").unwrap_err();
3179 assert!(err.to_string().contains("not found or not paused"));
3180 }
3181
3182 #[test]
3183 fn test_crontab_resume_not_paused() {
3184 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3185 let err = crontab_resume(crontab, "my-job").unwrap_err();
3186 assert!(err.to_string().contains("not paused"));
3187 }
3188
3189 #[test]
3192 fn test_cli_parse_schedule_install() {
3193 let cli = Cli::try_parse_from([
3194 "pulpo",
3195 "schedule",
3196 "install",
3197 "nightly",
3198 "0 3 * * *",
3199 "--workdir",
3200 "/tmp/repo",
3201 "Review",
3202 "PRs",
3203 ])
3204 .unwrap();
3205 assert!(matches!(
3206 &cli.command,
3207 Commands::Schedule {
3208 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3209 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3210 && provider == "claude" && prompt == &["Review", "PRs"]
3211 ));
3212 }
3213
3214 #[test]
3215 fn test_cli_parse_schedule_list() {
3216 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3217 assert!(matches!(
3218 &cli.command,
3219 Commands::Schedule {
3220 action: ScheduleAction::List
3221 }
3222 ));
3223 }
3224
3225 #[test]
3226 fn test_cli_parse_schedule_remove() {
3227 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3228 assert!(matches!(
3229 &cli.command,
3230 Commands::Schedule {
3231 action: ScheduleAction::Remove { name }
3232 } if name == "nightly"
3233 ));
3234 }
3235
3236 #[test]
3237 fn test_cli_parse_schedule_pause() {
3238 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3239 assert!(matches!(
3240 &cli.command,
3241 Commands::Schedule {
3242 action: ScheduleAction::Pause { name }
3243 } if name == "nightly"
3244 ));
3245 }
3246
3247 #[test]
3248 fn test_cli_parse_schedule_resume() {
3249 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3250 assert!(matches!(
3251 &cli.command,
3252 Commands::Schedule {
3253 action: ScheduleAction::Resume { name }
3254 } if name == "nightly"
3255 ));
3256 }
3257
3258 #[test]
3259 fn test_cli_parse_schedule_alias() {
3260 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3261 assert!(matches!(
3262 &cli.command,
3263 Commands::Schedule {
3264 action: ScheduleAction::List
3265 }
3266 ));
3267 }
3268
3269 #[test]
3270 fn test_cli_parse_schedule_list_alias() {
3271 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3272 assert!(matches!(
3273 &cli.command,
3274 Commands::Schedule {
3275 action: ScheduleAction::List
3276 }
3277 ));
3278 }
3279
3280 #[test]
3281 fn test_cli_parse_schedule_remove_alias() {
3282 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3283 assert!(matches!(
3284 &cli.command,
3285 Commands::Schedule {
3286 action: ScheduleAction::Remove { name }
3287 } if name == "nightly"
3288 ));
3289 }
3290
3291 #[test]
3292 fn test_cli_parse_schedule_install_custom_provider() {
3293 let cli = Cli::try_parse_from([
3294 "pulpo",
3295 "schedule",
3296 "install",
3297 "daily",
3298 "0 9 * * *",
3299 "--workdir",
3300 "/tmp",
3301 "--provider",
3302 "codex",
3303 "Run tests",
3304 ])
3305 .unwrap();
3306 assert!(matches!(
3307 &cli.command,
3308 Commands::Schedule {
3309 action: ScheduleAction::Install { provider, .. }
3310 } if provider == "codex"
3311 ));
3312 }
3313
3314 #[tokio::test]
3315 async fn test_execute_schedule_via_execute() {
3316 let node = start_test_server().await;
3318 let cli = Cli {
3319 node,
3320 token: None,
3321 command: Commands::Schedule {
3322 action: ScheduleAction::List,
3323 },
3324 };
3325 let result = execute(&cli).await;
3326 assert!(result.is_ok() || result.is_err());
3328 }
3329
3330 #[test]
3331 fn test_schedule_action_debug() {
3332 let action = ScheduleAction::List;
3333 assert_eq!(format!("{action:?}"), "List");
3334 }
3335
3336 #[test]
3339 fn test_cli_parse_knowledge() {
3340 let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3341 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3342 }
3343
3344 #[test]
3345 fn test_cli_parse_knowledge_alias() {
3346 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3347 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3348 }
3349
3350 #[test]
3351 fn test_cli_parse_knowledge_with_filters() {
3352 let cli = Cli::try_parse_from([
3353 "pulpo",
3354 "knowledge",
3355 "--kind",
3356 "failure",
3357 "--repo",
3358 "/tmp/repo",
3359 "--ink",
3360 "coder",
3361 "--limit",
3362 "5",
3363 ])
3364 .unwrap();
3365 match &cli.command {
3366 Commands::Knowledge {
3367 kind,
3368 repo,
3369 ink,
3370 limit,
3371 ..
3372 } => {
3373 assert_eq!(kind.as_deref(), Some("failure"));
3374 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3375 assert_eq!(ink.as_deref(), Some("coder"));
3376 assert_eq!(*limit, 5);
3377 }
3378 _ => panic!("expected Knowledge command"),
3379 }
3380 }
3381
3382 #[test]
3383 fn test_cli_parse_knowledge_context() {
3384 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3385 .unwrap();
3386 match &cli.command {
3387 Commands::Knowledge { context, repo, .. } => {
3388 assert!(*context);
3389 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3390 }
3391 _ => panic!("expected Knowledge command"),
3392 }
3393 }
3394
3395 #[test]
3396 fn test_format_knowledge_empty() {
3397 assert_eq!(format_knowledge(&[]), "No knowledge found.");
3398 }
3399
3400 #[test]
3401 fn test_format_knowledge_items() {
3402 use chrono::Utc;
3403 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3404 use uuid::Uuid;
3405
3406 let items = vec![
3407 Knowledge {
3408 id: Uuid::new_v4(),
3409 session_id: Uuid::new_v4(),
3410 kind: KnowledgeKind::Summary,
3411 scope_repo: Some("/tmp/repo".into()),
3412 scope_ink: Some("coder".into()),
3413 title: "Fixed the auth bug".into(),
3414 body: "Details".into(),
3415 tags: vec!["claude".into(), "completed".into()],
3416 relevance: 0.7,
3417 created_at: Utc::now(),
3418 },
3419 Knowledge {
3420 id: Uuid::new_v4(),
3421 session_id: Uuid::new_v4(),
3422 kind: KnowledgeKind::Failure,
3423 scope_repo: None,
3424 scope_ink: None,
3425 title: "OOM crash during build".into(),
3426 body: "Details".into(),
3427 tags: vec!["failure".into()],
3428 relevance: 0.9,
3429 created_at: Utc::now(),
3430 },
3431 ];
3432
3433 let output = format_knowledge(&items);
3434 assert!(output.contains("KIND"));
3435 assert!(output.contains("TITLE"));
3436 assert!(output.contains("summary"));
3437 assert!(output.contains("failure"));
3438 assert!(output.contains("Fixed the auth bug"));
3439 assert!(output.contains("repo"));
3440 assert!(output.contains("0.70"));
3441 }
3442
3443 #[test]
3444 fn test_format_knowledge_long_title_truncated() {
3445 use chrono::Utc;
3446 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3447 use uuid::Uuid;
3448
3449 let items = vec![Knowledge {
3450 id: Uuid::new_v4(),
3451 session_id: Uuid::new_v4(),
3452 kind: KnowledgeKind::Summary,
3453 scope_repo: Some("/repo".into()),
3454 scope_ink: None,
3455 title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3456 body: "Body".into(),
3457 tags: vec![],
3458 relevance: 0.5,
3459 created_at: Utc::now(),
3460 }];
3461
3462 let output = format_knowledge(&items);
3463 assert!(output.contains('…'));
3464 }
3465
3466 #[test]
3467 fn test_cli_parse_knowledge_get() {
3468 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3469 match &cli.command {
3470 Commands::Knowledge { get, .. } => {
3471 assert_eq!(get.as_deref(), Some("abc-123"));
3472 }
3473 _ => panic!("expected Knowledge command"),
3474 }
3475 }
3476
3477 #[test]
3478 fn test_cli_parse_knowledge_delete() {
3479 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3480 match &cli.command {
3481 Commands::Knowledge { delete, .. } => {
3482 assert_eq!(delete.as_deref(), Some("abc-123"));
3483 }
3484 _ => panic!("expected Knowledge command"),
3485 }
3486 }
3487
3488 #[test]
3489 fn test_cli_parse_knowledge_push() {
3490 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3491 match &cli.command {
3492 Commands::Knowledge { push, .. } => {
3493 assert!(*push);
3494 }
3495 _ => panic!("expected Knowledge command"),
3496 }
3497 }
3498
3499 #[test]
3500 fn test_format_knowledge_no_repo() {
3501 use chrono::Utc;
3502 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3503 use uuid::Uuid;
3504
3505 let items = vec![Knowledge {
3506 id: Uuid::new_v4(),
3507 session_id: Uuid::new_v4(),
3508 kind: KnowledgeKind::Summary,
3509 scope_repo: None,
3510 scope_ink: None,
3511 title: "Global finding".into(),
3512 body: "Body".into(),
3513 tags: vec![],
3514 relevance: 0.5,
3515 created_at: Utc::now(),
3516 }];
3517
3518 let output = format_knowledge(&items);
3519 assert!(output.contains('-')); }
3521}