1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5 KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32 #[command(visible_alias = "a")]
34 Attach {
35 name: String,
37 },
38
39 #[command(visible_alias = "i")]
41 Input {
42 name: String,
44 text: Option<String>,
46 },
47
48 #[command(visible_alias = "s")]
50 Spawn {
51 #[arg(long)]
53 workdir: Option<String>,
54
55 #[arg(long)]
57 name: Option<String>,
58
59 #[arg(long)]
61 provider: Option<String>,
62
63 #[arg(long)]
65 auto: bool,
66
67 #[arg(long)]
69 unrestricted: bool,
70
71 #[arg(long)]
73 model: Option<String>,
74
75 #[arg(long)]
77 system_prompt: Option<String>,
78
79 #[arg(long, value_delimiter = ',')]
81 allowed_tools: Option<Vec<String>>,
82
83 #[arg(long)]
85 ink: Option<String>,
86
87 #[arg(long)]
89 max_turns: Option<u32>,
90
91 #[arg(long)]
93 max_budget: Option<f64>,
94
95 #[arg(long)]
97 output_format: Option<String>,
98
99 prompt: Vec<String>,
101 },
102
103 #[command(visible_alias = "ls")]
105 List,
106
107 #[command(visible_alias = "l")]
109 Logs {
110 name: String,
112
113 #[arg(long, default_value = "100")]
115 lines: usize,
116
117 #[arg(short, long)]
119 follow: bool,
120 },
121
122 #[command(visible_alias = "k")]
124 Kill {
125 name: String,
127 },
128
129 #[command(visible_alias = "rm")]
131 Delete {
132 name: String,
134 },
135
136 #[command(visible_alias = "r")]
138 Resume {
139 name: String,
141 },
142
143 #[command(visible_alias = "n")]
145 Nodes,
146
147 #[command(visible_alias = "iv")]
149 Interventions {
150 name: String,
152 },
153
154 Ui,
156
157 #[command(visible_alias = "kn")]
159 Knowledge {
160 #[arg(long)]
162 session: Option<String>,
163
164 #[arg(long)]
166 kind: Option<String>,
167
168 #[arg(long)]
170 repo: Option<String>,
171
172 #[arg(long)]
174 ink: Option<String>,
175
176 #[arg(long, default_value = "20")]
178 limit: usize,
179
180 #[arg(long)]
182 context: bool,
183
184 #[arg(long)]
186 get: Option<String>,
187
188 #[arg(long)]
190 delete: Option<String>,
191
192 #[arg(long)]
194 push: bool,
195 },
196
197 #[command(visible_alias = "sched")]
199 Schedule {
200 #[command(subcommand)]
201 action: ScheduleAction,
202 },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207 Install {
209 name: String,
211 cron: String,
213 #[arg(long)]
215 workdir: String,
216 #[arg(long, default_value = "claude")]
218 provider: String,
219 prompt: Vec<String>,
221 },
222 #[command(alias = "ls")]
224 List,
225 #[command(alias = "rm")]
227 Remove {
228 name: String,
230 },
231 Pause {
233 name: String,
235 },
236 Resume {
238 name: String,
240 },
241}
242
243pub fn base_url(node: &str) -> String {
245 format!("http://{node}")
246}
247
248#[derive(serde::Deserialize)]
250struct OutputResponse {
251 output: String,
252}
253
254fn format_sessions(sessions: &[Session]) -> String {
256 if sessions.is_empty() {
257 return "No sessions.".into();
258 }
259 let mut lines = vec![format!(
260 "{:<20} {:<12} {:<10} {:<14} {}",
261 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262 )];
263 for s in sessions {
264 let prompt_display = if s.prompt.len() > 40 {
265 format!("{}...", &s.prompt[..37])
266 } else {
267 s.prompt.clone()
268 };
269 let status_display = if s.waiting_for_input {
270 "waiting".to_owned()
271 } else {
272 s.status.to_string()
273 };
274 lines.push(format!(
275 "{:<20} {:<12} {:<10} {:<14} {}",
276 s.name, status_display, s.provider, s.mode, prompt_display
277 ));
278 }
279 lines.join("\n")
280}
281
282fn format_nodes(resp: &PeersResponse) -> String {
284 let mut lines = vec![format!(
285 "{:<20} {:<25} {:<10} {}",
286 "NAME", "ADDRESS", "STATUS", "SESSIONS"
287 )];
288 lines.push(format!(
289 "{:<20} {:<25} {:<10} {}",
290 resp.local.name, "(local)", "online", "-"
291 ));
292 for p in &resp.peers {
293 let sessions = p
294 .session_count
295 .map_or_else(|| "-".into(), |c| c.to_string());
296 lines.push(format!(
297 "{:<20} {:<25} {:<10} {}",
298 p.name, p.address, p.status, sessions
299 ));
300 }
301 lines.join("\n")
302}
303
304fn format_interventions(events: &[InterventionEventResponse]) -> String {
306 if events.is_empty() {
307 return "No intervention events.".into();
308 }
309 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310 for e in events {
311 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312 }
313 lines.join("\n")
314}
315
316fn format_knowledge(items: &[Knowledge]) -> String {
318 if items.is_empty() {
319 return "No knowledge found.".into();
320 }
321 let mut lines = vec![format!(
322 "{:<10} {:<40} {:<10} {:<6} {}",
323 "KIND", "TITLE", "REPO", "REL", "TAGS"
324 )];
325 for k in items {
326 let title = if k.title.len() > 38 {
327 format!("{}…", &k.title[..37])
328 } else {
329 k.title.clone()
330 };
331 let repo = k
332 .scope_repo
333 .as_deref()
334 .and_then(|r| r.rsplit('/').next())
335 .unwrap_or("-");
336 let tags = k.tags.join(",");
337 lines.push(format!(
338 "{:<10} {:<40} {:<10} {:<6.2} {}",
339 k.kind, title, repo, k.relevance, tags
340 ));
341 }
342 lines.join("\n")
343}
344
345#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348 #[cfg(target_os = "macos")]
349 {
350 let mut cmd = std::process::Command::new("open");
351 cmd.arg(url);
352 cmd
353 }
354 #[cfg(target_os = "linux")]
355 {
356 let mut cmd = std::process::Command::new("xdg-open");
357 cmd.arg(url);
358 cmd
359 }
360 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361 {
362 let mut cmd = std::process::Command::new("xdg-open");
364 cmd.arg(url);
365 cmd
366 }
367}
368
369#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372 build_open_command(url).status()?;
373 Ok(())
374}
375
376#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379 Ok(())
380}
381
382#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386 let mut cmd = std::process::Command::new("tmux");
387 cmd.args(["attach-session", "-t", backend_session_id]);
388 cmd
389}
390
391#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394 let status = build_attach_command(backend_session_id).status()?;
395 if !status.success() {
396 anyhow::bail!("attach failed with {status}");
397 }
398 Ok(())
399}
400
401#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405 Ok(())
406}
407
408fn api_error(text: &str) -> anyhow::Error {
410 serde_json::from_str::<serde_json::Value>(text)
411 .ok()
412 .and_then(|v| v["error"].as_str().map(String::from))
413 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418 if resp.status().is_success() {
419 Ok(resp.text().await?)
420 } else {
421 let text = resp.text().await?;
422 Err(api_error(&text))
423 }
424}
425
426fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428 if err.is_connect() {
429 anyhow::anyhow!(
430 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431 )
432 } else {
433 anyhow::anyhow!("Network error connecting to {node}: {err}")
434 }
435}
436
437fn is_localhost(node: &str) -> bool {
439 let host = node.split(':').next().unwrap_or(node);
440 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445 let resp = client
446 .get(format!("{base}/api/v1/auth/token"))
447 .send()
448 .await
449 .ok()?;
450 let body: AuthTokenResponse = resp.json().await.ok()?;
451 if body.token.is_empty() {
452 None
453 } else {
454 Some(body.token)
455 }
456}
457
458async fn resolve_token(
460 client: &reqwest::Client,
461 base: &str,
462 node: &str,
463 explicit: Option<&str>,
464) -> Option<String> {
465 if let Some(t) = explicit {
466 return Some(t.to_owned());
467 }
468 if is_localhost(node) {
469 return discover_token(client, base).await;
470 }
471 None
472}
473
474fn authed_get(
476 client: &reqwest::Client,
477 url: String,
478 token: Option<&str>,
479) -> reqwest::RequestBuilder {
480 let req = client.get(url);
481 if let Some(t) = token {
482 req.bearer_auth(t)
483 } else {
484 req
485 }
486}
487
488fn authed_post(
490 client: &reqwest::Client,
491 url: String,
492 token: Option<&str>,
493) -> reqwest::RequestBuilder {
494 let req = client.post(url);
495 if let Some(t) = token {
496 req.bearer_auth(t)
497 } else {
498 req
499 }
500}
501
502fn authed_delete(
504 client: &reqwest::Client,
505 url: String,
506 token: Option<&str>,
507) -> reqwest::RequestBuilder {
508 let req = client.delete(url);
509 if let Some(t) = token {
510 req.bearer_auth(t)
511 } else {
512 req
513 }
514}
515
516async fn fetch_output(
518 client: &reqwest::Client,
519 base: &str,
520 name: &str,
521 lines: usize,
522 token: Option<&str>,
523) -> Result<String> {
524 let resp = authed_get(
525 client,
526 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527 token,
528 )
529 .send()
530 .await?;
531 let text = ok_or_api_error(resp).await?;
532 let output: OutputResponse = serde_json::from_str(&text)?;
533 Ok(output.output)
534}
535
536async fn fetch_session_status(
538 client: &reqwest::Client,
539 base: &str,
540 name: &str,
541 token: Option<&str>,
542) -> Result<String> {
543 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544 .send()
545 .await?;
546 let text = ok_or_api_error(resp).await?;
547 let session: Session = serde_json::from_str(&text)?;
548 Ok(session.status.to_string())
549}
550
551fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558 if prev.is_empty() {
559 return new;
560 }
561
562 let prev_lines: Vec<&str> = prev.lines().collect();
563 let new_lines: Vec<&str> = new.lines().collect();
564
565 if new_lines.is_empty() {
566 return "";
567 }
568
569 let last_prev = prev_lines[prev_lines.len() - 1];
571
572 for i in (0..new_lines.len()).rev() {
574 if new_lines[i] == last_prev {
575 let overlap_len = prev_lines.len().min(i + 1);
577 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579 if prev_tail == new_overlap {
580 if i + 1 < new_lines.len() {
581 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583 return new.get(consumed.min(new.len())..).unwrap_or("");
584 }
585 return "";
586 }
587 }
588 }
589
590 new
592}
593
594async fn follow_logs(
596 client: &reqwest::Client,
597 base: &str,
598 name: &str,
599 lines: usize,
600 token: Option<&str>,
601 writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604 write!(writer, "{prev_output}")?;
605
606 loop {
607 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609 let status = fetch_session_status(client, base, name, token).await?;
611 let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613 let new_output = fetch_output(client, base, name, lines, token).await?;
615
616 let diff = diff_output(&prev_output, &new_output);
617 if !diff.is_empty() {
618 write!(writer, "{diff}")?;
619 }
620 prev_output = new_output;
621
622 if is_terminal {
623 break;
624 }
625 }
626 Ok(())
627}
628
629#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637 let output = std::process::Command::new("crontab").arg("-l").output()?;
638 if output.status.success() {
639 Ok(String::from_utf8_lossy(&output.stdout).to_string())
640 } else {
641 Ok(String::new())
642 }
643}
644
645#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648 use std::io::Write;
649 let mut child = std::process::Command::new("crontab")
650 .arg("-")
651 .stdin(std::process::Stdio::piped())
652 .spawn()?;
653 child
654 .stdin
655 .as_mut()
656 .unwrap()
657 .write_all(content.as_bytes())?;
658 let status = child.wait()?;
659 if !status.success() {
660 anyhow::bail!("crontab write failed");
661 }
662 Ok(())
663}
664
665#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668 name: &str,
669 cron: &str,
670 workdir: &str,
671 provider: &str,
672 prompt: &str,
673 node: &str,
674) -> String {
675 format!(
676 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677 )
678}
679
680#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683 let tag = format!("{CRONTAB_TAG}{name}");
684 if crontab.contains(&tag) {
685 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686 }
687 let mut result = crontab.to_owned();
688 result.push_str(line);
689 Ok(result)
690}
691
692#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695 let entries: Vec<&str> = crontab
696 .lines()
697 .filter(|l| l.contains(CRONTAB_TAG))
698 .collect();
699 if entries.is_empty() {
700 return "No pulpo schedules.".into();
701 }
702 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703 for entry in entries {
704 let paused = entry.starts_with('#');
705 let raw = entry.trim_start_matches('#').trim();
706 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708 let cron_expr = if parts.len() >= 5 {
709 parts[..5].join(" ")
710 } else {
711 "?".into()
712 };
713 lines.push(format!(
714 "{:<20} {:<15} {}",
715 name,
716 cron_expr,
717 if paused { "yes" } else { "no" }
718 ));
719 }
720 lines.join("\n")
721}
722
723#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726 use std::fmt::Write;
727 let tag = format!("{CRONTAB_TAG}{name}");
728 let filtered =
729 crontab
730 .lines()
731 .filter(|l| !l.contains(&tag))
732 .fold(String::new(), |mut acc, l| {
733 writeln!(acc, "{l}").unwrap();
734 acc
735 });
736 if filtered.len() == crontab.len() {
737 anyhow::bail!("schedule \"{name}\" not found");
738 }
739 Ok(filtered)
740}
741
742#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745 use std::fmt::Write;
746 let tag = format!("{CRONTAB_TAG}{name}");
747 let mut found = false;
748 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749 if l.contains(&tag) && !l.starts_with('#') {
750 found = true;
751 writeln!(acc, "#{l}").unwrap();
752 } else {
753 writeln!(acc, "{l}").unwrap();
754 }
755 acc
756 });
757 if !found {
758 anyhow::bail!("schedule \"{name}\" not found or already paused");
759 }
760 Ok(updated)
761}
762
763#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766 use std::fmt::Write;
767 let tag = format!("{CRONTAB_TAG}{name}");
768 let mut found = false;
769 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770 if l.contains(&tag) && l.starts_with('#') {
771 found = true;
772 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773 } else {
774 writeln!(acc, "{l}").unwrap();
775 }
776 acc
777 });
778 if !found {
779 anyhow::bail!("schedule \"{name}\" not found or not paused");
780 }
781 Ok(updated)
782}
783
784#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787 match action {
788 ScheduleAction::Install {
789 name,
790 cron,
791 workdir,
792 provider,
793 prompt,
794 } => {
795 let crontab = read_crontab()?;
796 let joined_prompt = prompt.join(" ");
797 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798 let updated = crontab_install(&crontab, name, &line)?;
799 write_crontab(&updated)?;
800 Ok(format!("Installed schedule \"{name}\""))
801 }
802 ScheduleAction::List => {
803 let crontab = read_crontab()?;
804 Ok(crontab_list(&crontab))
805 }
806 ScheduleAction::Remove { name } => {
807 let crontab = read_crontab()?;
808 let updated = crontab_remove(&crontab, name)?;
809 write_crontab(&updated)?;
810 Ok(format!("Removed schedule \"{name}\""))
811 }
812 ScheduleAction::Pause { name } => {
813 let crontab = read_crontab()?;
814 let updated = crontab_pause(&crontab, name)?;
815 write_crontab(&updated)?;
816 Ok(format!("Paused schedule \"{name}\""))
817 }
818 ScheduleAction::Resume { name } => {
819 let crontab = read_crontab()?;
820 let updated = crontab_resume(&crontab, name)?;
821 write_crontab(&updated)?;
822 Ok(format!("Resumed schedule \"{name}\""))
823 }
824 }
825}
826
827#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830 Ok(String::new())
831}
832
833#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836 let url = base_url(&cli.node);
837 let client = reqwest::Client::new();
838 let node = &cli.node;
839 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841 match &cli.command {
842 Commands::Attach { name } => {
843 let resp = authed_get(
845 &client,
846 format!("{url}/api/v1/sessions/{name}"),
847 token.as_deref(),
848 )
849 .send()
850 .await
851 .map_err(|e| friendly_error(&e, node))?;
852 let text = ok_or_api_error(resp).await?;
853 let session: Session = serde_json::from_str(&text)?;
854 let backend_id = session
855 .backend_session_id
856 .unwrap_or_else(|| format!("pulpo-{name}"));
857 attach_session(&backend_id)?;
858 Ok(format!("Detached from session {name}."))
859 }
860 Commands::Input { name, text } => {
861 let input_text = text.as_deref().unwrap_or("\n");
862 let body = serde_json::json!({ "text": input_text });
863 let resp = authed_post(
864 &client,
865 format!("{url}/api/v1/sessions/{name}/input"),
866 token.as_deref(),
867 )
868 .json(&body)
869 .send()
870 .await
871 .map_err(|e| friendly_error(&e, node))?;
872 ok_or_api_error(resp).await?;
873 Ok(format!("Sent input to session {name}."))
874 }
875 Commands::List => {
876 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
877 .send()
878 .await
879 .map_err(|e| friendly_error(&e, node))?;
880 let text = ok_or_api_error(resp).await?;
881 let sessions: Vec<Session> = serde_json::from_str(&text)?;
882 Ok(format_sessions(&sessions))
883 }
884 Commands::Nodes => {
885 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
886 .send()
887 .await
888 .map_err(|e| friendly_error(&e, node))?;
889 let text = ok_or_api_error(resp).await?;
890 let resp: PeersResponse = serde_json::from_str(&text)?;
891 Ok(format_nodes(&resp))
892 }
893 Commands::Spawn {
894 workdir,
895 name,
896 provider,
897 auto,
898 unrestricted,
899 model,
900 system_prompt,
901 allowed_tools,
902 ink,
903 max_turns,
904 max_budget,
905 output_format,
906 prompt,
907 } => {
908 let prompt_text = prompt.join(" ");
909 let mode = if *auto { "autonomous" } else { "interactive" };
910 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
912 std::env::current_dir()
913 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
914 });
915 let mut body = serde_json::json!({
916 "workdir": resolved_workdir,
917 "mode": mode,
918 });
919 if !prompt_text.is_empty() {
921 body["prompt"] = serde_json::json!(prompt_text);
922 }
923 if let Some(p) = provider {
925 body["provider"] = serde_json::json!(p);
926 }
927 if *unrestricted {
928 body["unrestricted"] = serde_json::json!(true);
929 }
930 if let Some(n) = name {
931 body["name"] = serde_json::json!(n);
932 }
933 if let Some(m) = model {
934 body["model"] = serde_json::json!(m);
935 }
936 if let Some(sp) = system_prompt {
937 body["system_prompt"] = serde_json::json!(sp);
938 }
939 if let Some(tools) = allowed_tools {
940 body["allowed_tools"] = serde_json::json!(tools);
941 }
942 if let Some(p) = ink {
943 body["ink"] = serde_json::json!(p);
944 }
945 if let Some(mt) = max_turns {
946 body["max_turns"] = serde_json::json!(mt);
947 }
948 if let Some(mb) = max_budget {
949 body["max_budget_usd"] = serde_json::json!(mb);
950 }
951 if let Some(of) = output_format {
952 body["output_format"] = serde_json::json!(of);
953 }
954 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
955 .json(&body)
956 .send()
957 .await
958 .map_err(|e| friendly_error(&e, node))?;
959 let text = ok_or_api_error(resp).await?;
960 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
961 let mut msg = format!(
962 "Created session \"{}\" ({})",
963 resp.session.name, resp.session.id
964 );
965 for w in &resp.warnings {
966 use std::fmt::Write;
967 let _ = write!(msg, "\n Warning: {w}");
968 }
969 Ok(msg)
970 }
971 Commands::Kill { name } => {
972 let resp = authed_post(
973 &client,
974 format!("{url}/api/v1/sessions/{name}/kill"),
975 token.as_deref(),
976 )
977 .send()
978 .await
979 .map_err(|e| friendly_error(&e, node))?;
980 ok_or_api_error(resp).await?;
981 Ok(format!("Session {name} killed."))
982 }
983 Commands::Delete { name } => {
984 let resp = authed_delete(
985 &client,
986 format!("{url}/api/v1/sessions/{name}"),
987 token.as_deref(),
988 )
989 .send()
990 .await
991 .map_err(|e| friendly_error(&e, node))?;
992 ok_or_api_error(resp).await?;
993 Ok(format!("Session {name} deleted."))
994 }
995 Commands::Logs {
996 name,
997 lines,
998 follow,
999 } => {
1000 if *follow {
1001 let mut stdout = std::io::stdout();
1002 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1003 .await
1004 .map_err(|e| {
1005 match e.downcast::<reqwest::Error>() {
1007 Ok(re) => friendly_error(&re, node),
1008 Err(other) => other,
1009 }
1010 })?;
1011 Ok(String::new())
1012 } else {
1013 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1014 .await
1015 .map_err(|e| match e.downcast::<reqwest::Error>() {
1016 Ok(re) => friendly_error(&re, node),
1017 Err(other) => other,
1018 })?;
1019 Ok(output)
1020 }
1021 }
1022 Commands::Interventions { name } => {
1023 let resp = authed_get(
1024 &client,
1025 format!("{url}/api/v1/sessions/{name}/interventions"),
1026 token.as_deref(),
1027 )
1028 .send()
1029 .await
1030 .map_err(|e| friendly_error(&e, node))?;
1031 let text = ok_or_api_error(resp).await?;
1032 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1033 Ok(format_interventions(&events))
1034 }
1035 Commands::Ui => {
1036 let dashboard = base_url(&cli.node);
1037 open_browser(&dashboard)?;
1038 Ok(format!("Opening {dashboard}"))
1039 }
1040 Commands::Resume { name } => {
1041 let resp = authed_post(
1042 &client,
1043 format!("{url}/api/v1/sessions/{name}/resume"),
1044 token.as_deref(),
1045 )
1046 .send()
1047 .await
1048 .map_err(|e| friendly_error(&e, node))?;
1049 let text = ok_or_api_error(resp).await?;
1050 let session: Session = serde_json::from_str(&text)?;
1051 Ok(format!("Resumed session \"{}\"", session.name))
1052 }
1053 Commands::Knowledge {
1054 session,
1055 kind,
1056 repo,
1057 ink,
1058 limit,
1059 context,
1060 get,
1061 delete,
1062 push,
1063 } => {
1064 if let Some(id) = get {
1066 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1067 let resp = authed_get(&client, endpoint, token.as_deref())
1068 .send()
1069 .await
1070 .map_err(|e| friendly_error(&e, node))?;
1071 let text = ok_or_api_error(resp).await?;
1072 let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1073 return Ok(format_knowledge(&[resp.knowledge]));
1074 }
1075
1076 if let Some(id) = delete {
1078 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1079 let resp = authed_delete(&client, endpoint, token.as_deref())
1080 .send()
1081 .await
1082 .map_err(|e| friendly_error(&e, node))?;
1083 let text = ok_or_api_error(resp).await?;
1084 let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1085 return Ok(if resp.deleted {
1086 format!("Deleted knowledge item {id}")
1087 } else {
1088 format!("Knowledge item {id} not found")
1089 });
1090 }
1091
1092 if *push {
1094 let endpoint = format!("{url}/api/v1/knowledge/push");
1095 let resp = authed_post(&client, endpoint, token.as_deref())
1096 .send()
1097 .await
1098 .map_err(|e| friendly_error(&e, node))?;
1099 let text = ok_or_api_error(resp).await?;
1100 let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1101 return Ok(resp.message);
1102 }
1103
1104 let mut params = vec![format!("limit={limit}")];
1106 let endpoint = if *context {
1107 if let Some(r) = repo {
1108 params.push(format!("workdir={r}"));
1109 }
1110 if let Some(i) = ink {
1111 params.push(format!("ink={i}"));
1112 }
1113 format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1114 } else {
1115 if let Some(s) = session {
1116 params.push(format!("session_id={s}"));
1117 }
1118 if let Some(k) = kind {
1119 params.push(format!("kind={k}"));
1120 }
1121 if let Some(r) = repo {
1122 params.push(format!("repo={r}"));
1123 }
1124 if let Some(i) = ink {
1125 params.push(format!("ink={i}"));
1126 }
1127 format!("{url}/api/v1/knowledge?{}", params.join("&"))
1128 };
1129 let resp = authed_get(&client, endpoint, token.as_deref())
1130 .send()
1131 .await
1132 .map_err(|e| friendly_error(&e, node))?;
1133 let text = ok_or_api_error(resp).await?;
1134 let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1135 Ok(format_knowledge(&resp.knowledge))
1136 }
1137 Commands::Schedule { action } => execute_schedule(action, node),
1138 }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use super::*;
1144
1145 #[test]
1146 fn test_base_url() {
1147 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1148 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1149 }
1150
1151 #[test]
1152 fn test_cli_parse_list() {
1153 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1154 assert_eq!(cli.node, "localhost:7433");
1155 assert!(matches!(cli.command, Commands::List));
1156 }
1157
1158 #[test]
1159 fn test_cli_parse_nodes() {
1160 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1161 assert!(matches!(cli.command, Commands::Nodes));
1162 }
1163
1164 #[test]
1165 fn test_cli_parse_ui() {
1166 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1167 assert!(matches!(cli.command, Commands::Ui));
1168 }
1169
1170 #[test]
1171 fn test_cli_parse_ui_custom_node() {
1172 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1173 assert!(matches!(cli.command, Commands::Ui));
1174 assert_eq!(cli.node, "mac-mini:7433");
1175 }
1176
1177 #[test]
1178 fn test_build_open_command() {
1179 let cmd = build_open_command("http://localhost:7433");
1180 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1181 assert_eq!(args, vec!["http://localhost:7433"]);
1182 #[cfg(target_os = "macos")]
1183 assert_eq!(cmd.get_program(), "open");
1184 #[cfg(target_os = "linux")]
1185 assert_eq!(cmd.get_program(), "xdg-open");
1186 }
1187
1188 #[test]
1189 fn test_cli_parse_spawn() {
1190 let cli = Cli::try_parse_from([
1191 "pulpo",
1192 "spawn",
1193 "--workdir",
1194 "/tmp/repo",
1195 "Fix",
1196 "the",
1197 "bug",
1198 ])
1199 .unwrap();
1200 assert!(matches!(
1201 &cli.command,
1202 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1203 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1204 && !unrestricted && prompt == &["Fix", "the", "bug"]
1205 ));
1206 }
1207
1208 #[test]
1209 fn test_cli_parse_spawn_with_provider() {
1210 let cli = Cli::try_parse_from([
1211 "pulpo",
1212 "spawn",
1213 "--workdir",
1214 "/tmp",
1215 "--provider",
1216 "codex",
1217 "Do it",
1218 ])
1219 .unwrap();
1220 assert!(matches!(
1221 &cli.command,
1222 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1223 ));
1224 }
1225
1226 #[test]
1227 fn test_cli_parse_spawn_auto() {
1228 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1229 .unwrap();
1230 assert!(matches!(
1231 &cli.command,
1232 Commands::Spawn { auto, .. } if *auto
1233 ));
1234 }
1235
1236 #[test]
1237 fn test_cli_parse_spawn_unrestricted() {
1238 let cli = Cli::try_parse_from([
1239 "pulpo",
1240 "spawn",
1241 "--workdir",
1242 "/tmp",
1243 "--unrestricted",
1244 "Do it",
1245 ])
1246 .unwrap();
1247 assert!(matches!(
1248 &cli.command,
1249 Commands::Spawn { unrestricted, .. } if *unrestricted
1250 ));
1251 }
1252
1253 #[test]
1254 fn test_cli_parse_spawn_unrestricted_default() {
1255 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1256 assert!(matches!(
1257 &cli.command,
1258 Commands::Spawn { unrestricted, .. } if !unrestricted
1259 ));
1260 }
1261
1262 #[test]
1263 fn test_cli_parse_spawn_with_name() {
1264 let cli = Cli::try_parse_from([
1265 "pulpo",
1266 "spawn",
1267 "--workdir",
1268 "/tmp/repo",
1269 "--name",
1270 "my-task",
1271 "Fix it",
1272 ])
1273 .unwrap();
1274 assert!(matches!(
1275 &cli.command,
1276 Commands::Spawn { workdir, name, .. }
1277 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1278 ));
1279 }
1280
1281 #[test]
1282 fn test_cli_parse_spawn_without_name() {
1283 let cli =
1284 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1285 assert!(matches!(
1286 &cli.command,
1287 Commands::Spawn { name, .. } if name.is_none()
1288 ));
1289 }
1290
1291 #[test]
1292 fn test_cli_parse_logs() {
1293 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1294 assert!(matches!(
1295 &cli.command,
1296 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1297 ));
1298 }
1299
1300 #[test]
1301 fn test_cli_parse_logs_with_lines() {
1302 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1303 assert!(matches!(
1304 &cli.command,
1305 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1306 ));
1307 }
1308
1309 #[test]
1310 fn test_cli_parse_logs_follow() {
1311 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1312 assert!(matches!(
1313 &cli.command,
1314 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1315 ));
1316 }
1317
1318 #[test]
1319 fn test_cli_parse_logs_follow_short() {
1320 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1321 assert!(matches!(
1322 &cli.command,
1323 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1324 ));
1325 }
1326
1327 #[test]
1328 fn test_cli_parse_kill() {
1329 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1330 assert!(matches!(
1331 &cli.command,
1332 Commands::Kill { name } if name == "my-session"
1333 ));
1334 }
1335
1336 #[test]
1337 fn test_cli_parse_delete() {
1338 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1339 assert!(matches!(
1340 &cli.command,
1341 Commands::Delete { name } if name == "my-session"
1342 ));
1343 }
1344
1345 #[test]
1346 fn test_cli_parse_resume() {
1347 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1348 assert!(matches!(
1349 &cli.command,
1350 Commands::Resume { name } if name == "my-session"
1351 ));
1352 }
1353
1354 #[test]
1355 fn test_cli_parse_input() {
1356 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1357 assert!(matches!(
1358 &cli.command,
1359 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1360 ));
1361 }
1362
1363 #[test]
1364 fn test_cli_parse_input_no_text() {
1365 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1366 assert!(matches!(
1367 &cli.command,
1368 Commands::Input { name, text } if name == "my-session" && text.is_none()
1369 ));
1370 }
1371
1372 #[test]
1373 fn test_cli_parse_input_alias() {
1374 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1375 assert!(matches!(
1376 &cli.command,
1377 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1378 ));
1379 }
1380
1381 #[test]
1382 fn test_cli_parse_custom_node() {
1383 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1384 assert_eq!(cli.node, "win-pc:8080");
1385 }
1386
1387 #[test]
1388 fn test_cli_version() {
1389 let result = Cli::try_parse_from(["pulpo", "--version"]);
1390 let err = result.unwrap_err();
1392 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1393 }
1394
1395 #[test]
1396 fn test_cli_parse_no_subcommand_fails() {
1397 let result = Cli::try_parse_from(["pulpo"]);
1398 assert!(result.is_err());
1399 }
1400
1401 #[test]
1402 fn test_cli_debug() {
1403 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1404 let debug = format!("{cli:?}");
1405 assert!(debug.contains("List"));
1406 }
1407
1408 #[test]
1409 fn test_commands_debug() {
1410 let cmd = Commands::List;
1411 assert_eq!(format!("{cmd:?}"), "List");
1412 }
1413
1414 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1416
1417 fn test_create_response_json() -> String {
1419 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1420 }
1421
1422 async fn start_test_server() -> String {
1424 use axum::http::StatusCode;
1425 use axum::{
1426 Json, Router,
1427 routing::{get, post},
1428 };
1429
1430 let create_json = test_create_response_json();
1431
1432 let app = Router::new()
1433 .route(
1434 "/api/v1/sessions",
1435 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1436 (StatusCode::CREATED, create_json.clone())
1437 }),
1438 )
1439 .route(
1440 "/api/v1/sessions/{id}",
1441 get(|| async { TEST_SESSION_JSON.to_owned() })
1442 .delete(|| async { StatusCode::NO_CONTENT }),
1443 )
1444 .route(
1445 "/api/v1/sessions/{id}/kill",
1446 post(|| async { StatusCode::NO_CONTENT }),
1447 )
1448 .route(
1449 "/api/v1/sessions/{id}/output",
1450 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1451 )
1452 .route(
1453 "/api/v1/peers",
1454 get(|| async {
1455 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1456 }),
1457 )
1458 .route(
1459 "/api/v1/sessions/{id}/resume",
1460 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1461 )
1462 .route(
1463 "/api/v1/sessions/{id}/interventions",
1464 get(|| async { "[]".to_owned() }),
1465 )
1466 .route(
1467 "/api/v1/sessions/{id}/input",
1468 post(|| async { StatusCode::NO_CONTENT }),
1469 );
1470
1471 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1472 let addr = listener.local_addr().unwrap();
1473 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1474 format!("127.0.0.1:{}", addr.port())
1475 }
1476
1477 #[tokio::test]
1478 async fn test_execute_list_success() {
1479 let node = start_test_server().await;
1480 let cli = Cli {
1481 node,
1482 token: None,
1483 command: Commands::List,
1484 };
1485 let result = execute(&cli).await.unwrap();
1486 assert_eq!(result, "No sessions.");
1487 }
1488
1489 #[tokio::test]
1490 async fn test_execute_nodes_success() {
1491 let node = start_test_server().await;
1492 let cli = Cli {
1493 node,
1494 token: None,
1495 command: Commands::Nodes,
1496 };
1497 let result = execute(&cli).await.unwrap();
1498 assert!(result.contains("test"));
1499 assert!(result.contains("(local)"));
1500 assert!(result.contains("NAME"));
1501 }
1502
1503 #[tokio::test]
1504 async fn test_execute_spawn_success() {
1505 let node = start_test_server().await;
1506 let cli = Cli {
1507 node,
1508 token: None,
1509 command: Commands::Spawn {
1510 workdir: Some("/tmp/repo".into()),
1511 name: None,
1512 provider: Some("claude".into()),
1513 auto: false,
1514 unrestricted: false,
1515 model: None,
1516 system_prompt: None,
1517 allowed_tools: None,
1518 ink: None,
1519 max_turns: None,
1520 max_budget: None,
1521 output_format: None,
1522 prompt: vec!["Fix".into(), "bug".into()],
1523 },
1524 };
1525 let result = execute(&cli).await.unwrap();
1526 assert!(result.contains("Created session"));
1527 assert!(result.contains("repo"));
1528 }
1529
1530 #[tokio::test]
1531 async fn test_execute_spawn_with_all_new_flags() {
1532 let node = start_test_server().await;
1533 let cli = Cli {
1534 node,
1535 token: None,
1536 command: Commands::Spawn {
1537 workdir: Some("/tmp/repo".into()),
1538 name: None,
1539 provider: Some("claude".into()),
1540 auto: false,
1541 unrestricted: false,
1542 model: Some("opus".into()),
1543 system_prompt: Some("Be helpful".into()),
1544 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1545 ink: Some("coder".into()),
1546 max_turns: Some(5),
1547 max_budget: Some(2.5),
1548 output_format: Some("json".into()),
1549 prompt: vec!["Fix".into(), "bug".into()],
1550 },
1551 };
1552 let result = execute(&cli).await.unwrap();
1553 assert!(result.contains("Created session"));
1554 }
1555
1556 #[tokio::test]
1557 async fn test_execute_spawn_auto_mode() {
1558 let node = start_test_server().await;
1559 let cli = Cli {
1560 node,
1561 token: None,
1562 command: Commands::Spawn {
1563 workdir: Some("/tmp/repo".into()),
1564 name: None,
1565 provider: Some("claude".into()),
1566 auto: true,
1567 unrestricted: false,
1568 model: None,
1569 system_prompt: None,
1570 allowed_tools: None,
1571 ink: None,
1572 max_turns: None,
1573 max_budget: None,
1574 output_format: None,
1575 prompt: vec!["Do it".into()],
1576 },
1577 };
1578 let result = execute(&cli).await.unwrap();
1579 assert!(result.contains("Created session"));
1580 }
1581
1582 #[tokio::test]
1583 async fn test_execute_spawn_with_name() {
1584 let node = start_test_server().await;
1585 let cli = Cli {
1586 node,
1587 token: None,
1588 command: Commands::Spawn {
1589 workdir: Some("/tmp/repo".into()),
1590 name: Some("my-task".into()),
1591 provider: Some("claude".into()),
1592 auto: false,
1593 unrestricted: false,
1594 model: None,
1595 system_prompt: None,
1596 allowed_tools: None,
1597 ink: None,
1598 max_turns: None,
1599 max_budget: None,
1600 output_format: None,
1601 prompt: vec!["Fix".into(), "bug".into()],
1602 },
1603 };
1604 let result = execute(&cli).await.unwrap();
1605 assert!(result.contains("Created session"));
1606 }
1607
1608 #[tokio::test]
1609 async fn test_execute_kill_success() {
1610 let node = start_test_server().await;
1611 let cli = Cli {
1612 node,
1613 token: None,
1614 command: Commands::Kill {
1615 name: "test-session".into(),
1616 },
1617 };
1618 let result = execute(&cli).await.unwrap();
1619 assert!(result.contains("killed"));
1620 }
1621
1622 #[tokio::test]
1623 async fn test_execute_delete_success() {
1624 let node = start_test_server().await;
1625 let cli = Cli {
1626 node,
1627 token: None,
1628 command: Commands::Delete {
1629 name: "test-session".into(),
1630 },
1631 };
1632 let result = execute(&cli).await.unwrap();
1633 assert!(result.contains("deleted"));
1634 }
1635
1636 #[tokio::test]
1637 async fn test_execute_logs_success() {
1638 let node = start_test_server().await;
1639 let cli = Cli {
1640 node,
1641 token: None,
1642 command: Commands::Logs {
1643 name: "test-session".into(),
1644 lines: 50,
1645 follow: false,
1646 },
1647 };
1648 let result = execute(&cli).await.unwrap();
1649 assert!(result.contains("test output"));
1650 }
1651
1652 #[tokio::test]
1653 async fn test_execute_list_connection_refused() {
1654 let cli = Cli {
1655 node: "localhost:1".into(),
1656 token: None,
1657 command: Commands::List,
1658 };
1659 let result = execute(&cli).await;
1660 let err = result.unwrap_err().to_string();
1661 assert!(
1662 err.contains("Could not connect to pulpod"),
1663 "Expected friendly error, got: {err}"
1664 );
1665 assert!(err.contains("localhost:1"));
1666 }
1667
1668 #[tokio::test]
1669 async fn test_execute_nodes_connection_refused() {
1670 let cli = Cli {
1671 node: "localhost:1".into(),
1672 token: None,
1673 command: Commands::Nodes,
1674 };
1675 let result = execute(&cli).await;
1676 let err = result.unwrap_err().to_string();
1677 assert!(err.contains("Could not connect to pulpod"));
1678 }
1679
1680 #[tokio::test]
1681 async fn test_execute_kill_error_response() {
1682 use axum::{Router, http::StatusCode, routing::post};
1683
1684 let app = Router::new().route(
1685 "/api/v1/sessions/{id}/kill",
1686 post(|| async {
1687 (
1688 StatusCode::NOT_FOUND,
1689 "{\"error\":\"session not found: test-session\"}",
1690 )
1691 }),
1692 );
1693 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1694 let addr = listener.local_addr().unwrap();
1695 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1696 let node = format!("127.0.0.1:{}", addr.port());
1697
1698 let cli = Cli {
1699 node,
1700 token: None,
1701 command: Commands::Kill {
1702 name: "test-session".into(),
1703 },
1704 };
1705 let err = execute(&cli).await.unwrap_err();
1706 assert_eq!(err.to_string(), "session not found: test-session");
1707 }
1708
1709 #[tokio::test]
1710 async fn test_execute_delete_error_response() {
1711 use axum::{Router, http::StatusCode, routing::delete};
1712
1713 let app = Router::new().route(
1714 "/api/v1/sessions/{id}",
1715 delete(|| async {
1716 (
1717 StatusCode::CONFLICT,
1718 "{\"error\":\"cannot delete session in 'running' state\"}",
1719 )
1720 }),
1721 );
1722 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1723 let addr = listener.local_addr().unwrap();
1724 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1725 let node = format!("127.0.0.1:{}", addr.port());
1726
1727 let cli = Cli {
1728 node,
1729 token: None,
1730 command: Commands::Delete {
1731 name: "test-session".into(),
1732 },
1733 };
1734 let err = execute(&cli).await.unwrap_err();
1735 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1736 }
1737
1738 #[tokio::test]
1739 async fn test_execute_logs_error_response() {
1740 use axum::{Router, http::StatusCode, routing::get};
1741
1742 let app = Router::new().route(
1743 "/api/v1/sessions/{id}/output",
1744 get(|| async {
1745 (
1746 StatusCode::NOT_FOUND,
1747 "{\"error\":\"session not found: ghost\"}",
1748 )
1749 }),
1750 );
1751 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1752 let addr = listener.local_addr().unwrap();
1753 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1754 let node = format!("127.0.0.1:{}", addr.port());
1755
1756 let cli = Cli {
1757 node,
1758 token: None,
1759 command: Commands::Logs {
1760 name: "ghost".into(),
1761 lines: 50,
1762 follow: false,
1763 },
1764 };
1765 let err = execute(&cli).await.unwrap_err();
1766 assert_eq!(err.to_string(), "session not found: ghost");
1767 }
1768
1769 #[tokio::test]
1770 async fn test_execute_resume_error_response() {
1771 use axum::{Router, http::StatusCode, routing::post};
1772
1773 let app = Router::new().route(
1774 "/api/v1/sessions/{id}/resume",
1775 post(|| async {
1776 (
1777 StatusCode::BAD_REQUEST,
1778 "{\"error\":\"session is not stale (status: running)\"}",
1779 )
1780 }),
1781 );
1782 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1783 let addr = listener.local_addr().unwrap();
1784 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1785 let node = format!("127.0.0.1:{}", addr.port());
1786
1787 let cli = Cli {
1788 node,
1789 token: None,
1790 command: Commands::Resume {
1791 name: "test-session".into(),
1792 },
1793 };
1794 let err = execute(&cli).await.unwrap_err();
1795 assert_eq!(err.to_string(), "session is not stale (status: running)");
1796 }
1797
1798 #[tokio::test]
1799 async fn test_execute_spawn_error_response() {
1800 use axum::{Router, http::StatusCode, routing::post};
1801
1802 let app = Router::new().route(
1803 "/api/v1/sessions",
1804 post(|| async {
1805 (
1806 StatusCode::INTERNAL_SERVER_ERROR,
1807 "{\"error\":\"failed to spawn session\"}",
1808 )
1809 }),
1810 );
1811 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1812 let addr = listener.local_addr().unwrap();
1813 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1814 let node = format!("127.0.0.1:{}", addr.port());
1815
1816 let cli = Cli {
1817 node,
1818 token: None,
1819 command: Commands::Spawn {
1820 workdir: Some("/tmp/repo".into()),
1821 name: None,
1822 provider: Some("claude".into()),
1823 auto: false,
1824 unrestricted: false,
1825 model: None,
1826 system_prompt: None,
1827 allowed_tools: None,
1828 ink: None,
1829 max_turns: None,
1830 max_budget: None,
1831 output_format: None,
1832 prompt: vec!["test".into()],
1833 },
1834 };
1835 let err = execute(&cli).await.unwrap_err();
1836 assert_eq!(err.to_string(), "failed to spawn session");
1837 }
1838
1839 #[tokio::test]
1840 async fn test_execute_interventions_error_response() {
1841 use axum::{Router, http::StatusCode, routing::get};
1842
1843 let app = Router::new().route(
1844 "/api/v1/sessions/{id}/interventions",
1845 get(|| async {
1846 (
1847 StatusCode::NOT_FOUND,
1848 "{\"error\":\"session not found: ghost\"}",
1849 )
1850 }),
1851 );
1852 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1853 let addr = listener.local_addr().unwrap();
1854 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1855 let node = format!("127.0.0.1:{}", addr.port());
1856
1857 let cli = Cli {
1858 node,
1859 token: None,
1860 command: Commands::Interventions {
1861 name: "ghost".into(),
1862 },
1863 };
1864 let err = execute(&cli).await.unwrap_err();
1865 assert_eq!(err.to_string(), "session not found: ghost");
1866 }
1867
1868 #[tokio::test]
1869 async fn test_execute_resume_success() {
1870 let node = start_test_server().await;
1871 let cli = Cli {
1872 node,
1873 token: None,
1874 command: Commands::Resume {
1875 name: "test-session".into(),
1876 },
1877 };
1878 let result = execute(&cli).await.unwrap();
1879 assert!(result.contains("Resumed session"));
1880 assert!(result.contains("repo"));
1881 }
1882
1883 #[tokio::test]
1884 async fn test_execute_input_success() {
1885 let node = start_test_server().await;
1886 let cli = Cli {
1887 node,
1888 token: None,
1889 command: Commands::Input {
1890 name: "test-session".into(),
1891 text: Some("yes".into()),
1892 },
1893 };
1894 let result = execute(&cli).await.unwrap();
1895 assert!(result.contains("Sent input to session test-session"));
1896 }
1897
1898 #[tokio::test]
1899 async fn test_execute_input_no_text() {
1900 let node = start_test_server().await;
1901 let cli = Cli {
1902 node,
1903 token: None,
1904 command: Commands::Input {
1905 name: "test-session".into(),
1906 text: None,
1907 },
1908 };
1909 let result = execute(&cli).await.unwrap();
1910 assert!(result.contains("Sent input to session test-session"));
1911 }
1912
1913 #[tokio::test]
1914 async fn test_execute_input_connection_refused() {
1915 let cli = Cli {
1916 node: "localhost:1".into(),
1917 token: None,
1918 command: Commands::Input {
1919 name: "test".into(),
1920 text: Some("y".into()),
1921 },
1922 };
1923 let result = execute(&cli).await;
1924 let err = result.unwrap_err().to_string();
1925 assert!(err.contains("Could not connect to pulpod"));
1926 }
1927
1928 #[tokio::test]
1929 async fn test_execute_input_error_response() {
1930 use axum::{Router, http::StatusCode, routing::post};
1931
1932 let app = Router::new().route(
1933 "/api/v1/sessions/{id}/input",
1934 post(|| async {
1935 (
1936 StatusCode::NOT_FOUND,
1937 "{\"error\":\"session not found: ghost\"}",
1938 )
1939 }),
1940 );
1941 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1942 let addr = listener.local_addr().unwrap();
1943 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1944 let node = format!("127.0.0.1:{}", addr.port());
1945
1946 let cli = Cli {
1947 node,
1948 token: None,
1949 command: Commands::Input {
1950 name: "ghost".into(),
1951 text: Some("y".into()),
1952 },
1953 };
1954 let err = execute(&cli).await.unwrap_err();
1955 assert_eq!(err.to_string(), "session not found: ghost");
1956 }
1957
1958 #[tokio::test]
1959 async fn test_execute_ui() {
1960 let cli = Cli {
1961 node: "localhost:7433".into(),
1962 token: None,
1963 command: Commands::Ui,
1964 };
1965 let result = execute(&cli).await.unwrap();
1966 assert!(result.contains("Opening"));
1967 assert!(result.contains("http://localhost:7433"));
1968 }
1969
1970 #[tokio::test]
1971 async fn test_execute_ui_custom_node() {
1972 let cli = Cli {
1973 node: "mac-mini:7433".into(),
1974 token: None,
1975 command: Commands::Ui,
1976 };
1977 let result = execute(&cli).await.unwrap();
1978 assert!(result.contains("http://mac-mini:7433"));
1979 }
1980
1981 #[test]
1982 fn test_format_sessions_empty() {
1983 assert_eq!(format_sessions(&[]), "No sessions.");
1984 }
1985
1986 #[test]
1987 fn test_format_sessions_with_data() {
1988 use chrono::Utc;
1989 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1990 use uuid::Uuid;
1991
1992 let sessions = vec![Session {
1993 id: Uuid::nil(),
1994 name: "my-api".into(),
1995 workdir: "/tmp/repo".into(),
1996 provider: Provider::Claude,
1997 prompt: "Fix the bug".into(),
1998 status: SessionStatus::Running,
1999 mode: SessionMode::Interactive,
2000 conversation_id: None,
2001 exit_code: None,
2002 backend_session_id: None,
2003 output_snapshot: None,
2004 guard_config: None,
2005 model: None,
2006 allowed_tools: None,
2007 system_prompt: None,
2008 metadata: None,
2009 ink: None,
2010 max_turns: None,
2011 max_budget_usd: None,
2012 output_format: None,
2013 intervention_reason: None,
2014 intervention_at: None,
2015 last_output_at: None,
2016 idle_since: None,
2017 waiting_for_input: false,
2018 created_at: Utc::now(),
2019 updated_at: Utc::now(),
2020 }];
2021 let output = format_sessions(&sessions);
2022 assert!(output.contains("NAME"));
2023 assert!(output.contains("my-api"));
2024 assert!(output.contains("running"));
2025 assert!(output.contains("claude"));
2026 assert!(output.contains("Fix the bug"));
2027 }
2028
2029 #[test]
2030 fn test_format_sessions_long_prompt_truncated() {
2031 use chrono::Utc;
2032 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2033 use uuid::Uuid;
2034
2035 let sessions = vec![Session {
2036 id: Uuid::nil(),
2037 name: "test".into(),
2038 workdir: "/tmp".into(),
2039 provider: Provider::Codex,
2040 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2041 status: SessionStatus::Completed,
2042 mode: SessionMode::Autonomous,
2043 conversation_id: None,
2044 exit_code: None,
2045 backend_session_id: None,
2046 output_snapshot: None,
2047 guard_config: None,
2048 model: None,
2049 allowed_tools: None,
2050 system_prompt: None,
2051 metadata: None,
2052 ink: None,
2053 max_turns: None,
2054 max_budget_usd: None,
2055 output_format: None,
2056 intervention_reason: None,
2057 intervention_at: None,
2058 last_output_at: None,
2059 idle_since: None,
2060 waiting_for_input: false,
2061 created_at: Utc::now(),
2062 updated_at: Utc::now(),
2063 }];
2064 let output = format_sessions(&sessions);
2065 assert!(output.contains("..."));
2066 }
2067
2068 #[test]
2069 fn test_format_sessions_waiting_for_input() {
2070 use chrono::Utc;
2071 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2072 use uuid::Uuid;
2073
2074 let sessions = vec![Session {
2075 id: Uuid::nil(),
2076 name: "blocked".into(),
2077 workdir: "/tmp".into(),
2078 provider: Provider::Claude,
2079 prompt: "Fix bug".into(),
2080 status: SessionStatus::Running,
2081 mode: SessionMode::Interactive,
2082 conversation_id: None,
2083 exit_code: None,
2084 backend_session_id: None,
2085 output_snapshot: None,
2086 guard_config: None,
2087 model: None,
2088 allowed_tools: None,
2089 system_prompt: None,
2090 metadata: None,
2091 ink: None,
2092 max_turns: None,
2093 max_budget_usd: None,
2094 output_format: None,
2095 intervention_reason: None,
2096 intervention_at: None,
2097 last_output_at: None,
2098 idle_since: None,
2099 waiting_for_input: true,
2100 created_at: Utc::now(),
2101 updated_at: Utc::now(),
2102 }];
2103 let output = format_sessions(&sessions);
2104 assert!(output.contains("waiting"));
2105 assert!(!output.contains("running"));
2106 }
2107
2108 #[test]
2109 fn test_format_nodes() {
2110 use pulpo_common::node::NodeInfo;
2111 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2112
2113 let resp = PeersResponse {
2114 local: NodeInfo {
2115 name: "mac-mini".into(),
2116 hostname: "h".into(),
2117 os: "macos".into(),
2118 arch: "arm64".into(),
2119 cpus: 8,
2120 memory_mb: 16384,
2121 gpu: None,
2122 },
2123 peers: vec![PeerInfo {
2124 name: "win-pc".into(),
2125 address: "win-pc:7433".into(),
2126 status: PeerStatus::Online,
2127 node_info: None,
2128 session_count: Some(3),
2129 source: PeerSource::Configured,
2130 }],
2131 };
2132 let output = format_nodes(&resp);
2133 assert!(output.contains("mac-mini"));
2134 assert!(output.contains("(local)"));
2135 assert!(output.contains("win-pc"));
2136 assert!(output.contains('3'));
2137 }
2138
2139 #[test]
2140 fn test_format_nodes_no_session_count() {
2141 use pulpo_common::node::NodeInfo;
2142 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2143
2144 let resp = PeersResponse {
2145 local: NodeInfo {
2146 name: "local".into(),
2147 hostname: "h".into(),
2148 os: "linux".into(),
2149 arch: "x86_64".into(),
2150 cpus: 4,
2151 memory_mb: 8192,
2152 gpu: None,
2153 },
2154 peers: vec![PeerInfo {
2155 name: "peer".into(),
2156 address: "peer:7433".into(),
2157 status: PeerStatus::Offline,
2158 node_info: None,
2159 session_count: None,
2160 source: PeerSource::Configured,
2161 }],
2162 };
2163 let output = format_nodes(&resp);
2164 assert!(output.contains("offline"));
2165 let lines: Vec<&str> = output.lines().collect();
2167 assert!(lines[2].contains('-'));
2168 }
2169
2170 #[tokio::test]
2171 async fn test_execute_resume_connection_refused() {
2172 let cli = Cli {
2173 node: "localhost:1".into(),
2174 token: None,
2175 command: Commands::Resume {
2176 name: "test".into(),
2177 },
2178 };
2179 let result = execute(&cli).await;
2180 let err = result.unwrap_err().to_string();
2181 assert!(err.contains("Could not connect to pulpod"));
2182 }
2183
2184 #[tokio::test]
2185 async fn test_execute_spawn_connection_refused() {
2186 let cli = Cli {
2187 node: "localhost:1".into(),
2188 token: None,
2189 command: Commands::Spawn {
2190 workdir: Some("/tmp".into()),
2191 name: None,
2192 provider: Some("claude".into()),
2193 auto: false,
2194 unrestricted: false,
2195 model: None,
2196 system_prompt: None,
2197 allowed_tools: None,
2198 ink: None,
2199 max_turns: None,
2200 max_budget: None,
2201 output_format: None,
2202 prompt: vec!["test".into()],
2203 },
2204 };
2205 let result = execute(&cli).await;
2206 let err = result.unwrap_err().to_string();
2207 assert!(err.contains("Could not connect to pulpod"));
2208 }
2209
2210 #[tokio::test]
2211 async fn test_execute_kill_connection_refused() {
2212 let cli = Cli {
2213 node: "localhost:1".into(),
2214 token: None,
2215 command: Commands::Kill {
2216 name: "test".into(),
2217 },
2218 };
2219 let result = execute(&cli).await;
2220 let err = result.unwrap_err().to_string();
2221 assert!(err.contains("Could not connect to pulpod"));
2222 }
2223
2224 #[tokio::test]
2225 async fn test_execute_delete_connection_refused() {
2226 let cli = Cli {
2227 node: "localhost:1".into(),
2228 token: None,
2229 command: Commands::Delete {
2230 name: "test".into(),
2231 },
2232 };
2233 let result = execute(&cli).await;
2234 let err = result.unwrap_err().to_string();
2235 assert!(err.contains("Could not connect to pulpod"));
2236 }
2237
2238 #[tokio::test]
2239 async fn test_execute_logs_connection_refused() {
2240 let cli = Cli {
2241 node: "localhost:1".into(),
2242 token: None,
2243 command: Commands::Logs {
2244 name: "test".into(),
2245 lines: 50,
2246 follow: false,
2247 },
2248 };
2249 let result = execute(&cli).await;
2250 let err = result.unwrap_err().to_string();
2251 assert!(err.contains("Could not connect to pulpod"));
2252 }
2253
2254 #[tokio::test]
2255 async fn test_friendly_error_connect() {
2256 let err = reqwest::Client::new()
2258 .get("http://127.0.0.1:1")
2259 .send()
2260 .await
2261 .unwrap_err();
2262 let friendly = friendly_error(&err, "test-node:1");
2263 let msg = friendly.to_string();
2264 assert!(
2265 msg.contains("Could not connect"),
2266 "Expected connect message, got: {msg}"
2267 );
2268 }
2269
2270 #[tokio::test]
2271 async fn test_friendly_error_other() {
2272 let err = reqwest::Client::new()
2274 .get("http://[::invalid::url")
2275 .send()
2276 .await
2277 .unwrap_err();
2278 let friendly = friendly_error(&err, "bad-host");
2279 let msg = friendly.to_string();
2280 assert!(
2281 msg.contains("Network error"),
2282 "Expected network error message, got: {msg}"
2283 );
2284 assert!(msg.contains("bad-host"));
2285 }
2286
2287 #[test]
2290 fn test_is_localhost_variants() {
2291 assert!(is_localhost("localhost:7433"));
2292 assert!(is_localhost("127.0.0.1:7433"));
2293 assert!(is_localhost("[::1]:7433"));
2294 assert!(is_localhost("::1"));
2295 assert!(is_localhost("localhost"));
2296 assert!(!is_localhost("mac-mini:7433"));
2297 assert!(!is_localhost("192.168.1.100:7433"));
2298 }
2299
2300 #[test]
2301 fn test_authed_get_with_token() {
2302 let client = reqwest::Client::new();
2303 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2304 .build()
2305 .unwrap();
2306 let auth = req
2307 .headers()
2308 .get("authorization")
2309 .unwrap()
2310 .to_str()
2311 .unwrap();
2312 assert_eq!(auth, "Bearer tok");
2313 }
2314
2315 #[test]
2316 fn test_authed_get_without_token() {
2317 let client = reqwest::Client::new();
2318 let req = authed_get(&client, "http://h:1/api".into(), None)
2319 .build()
2320 .unwrap();
2321 assert!(req.headers().get("authorization").is_none());
2322 }
2323
2324 #[test]
2325 fn test_authed_post_with_token() {
2326 let client = reqwest::Client::new();
2327 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2328 .build()
2329 .unwrap();
2330 let auth = req
2331 .headers()
2332 .get("authorization")
2333 .unwrap()
2334 .to_str()
2335 .unwrap();
2336 assert_eq!(auth, "Bearer secret");
2337 }
2338
2339 #[test]
2340 fn test_authed_post_without_token() {
2341 let client = reqwest::Client::new();
2342 let req = authed_post(&client, "http://h:1/api".into(), None)
2343 .build()
2344 .unwrap();
2345 assert!(req.headers().get("authorization").is_none());
2346 }
2347
2348 #[test]
2349 fn test_authed_delete_with_token() {
2350 let client = reqwest::Client::new();
2351 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2352 .build()
2353 .unwrap();
2354 let auth = req
2355 .headers()
2356 .get("authorization")
2357 .unwrap()
2358 .to_str()
2359 .unwrap();
2360 assert_eq!(auth, "Bearer del-tok");
2361 }
2362
2363 #[test]
2364 fn test_authed_delete_without_token() {
2365 let client = reqwest::Client::new();
2366 let req = authed_delete(&client, "http://h:1/api".into(), None)
2367 .build()
2368 .unwrap();
2369 assert!(req.headers().get("authorization").is_none());
2370 }
2371
2372 #[tokio::test]
2373 async fn test_resolve_token_explicit() {
2374 let client = reqwest::Client::new();
2375 let token =
2376 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2377 assert_eq!(token, Some("my-tok".into()));
2378 }
2379
2380 #[tokio::test]
2381 async fn test_resolve_token_remote_no_explicit() {
2382 let client = reqwest::Client::new();
2383 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2384 assert_eq!(token, None);
2385 }
2386
2387 #[tokio::test]
2388 async fn test_resolve_token_localhost_auto_discover() {
2389 use axum::{Json, Router, routing::get};
2390
2391 let app = Router::new().route(
2392 "/api/v1/auth/token",
2393 get(|| async {
2394 Json(AuthTokenResponse {
2395 token: "discovered".into(),
2396 })
2397 }),
2398 );
2399 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2400 let addr = listener.local_addr().unwrap();
2401 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2402
2403 let node = format!("localhost:{}", addr.port());
2404 let base = base_url(&node);
2405 let client = reqwest::Client::new();
2406 let token = resolve_token(&client, &base, &node, None).await;
2407 assert_eq!(token, Some("discovered".into()));
2408 }
2409
2410 #[tokio::test]
2411 async fn test_discover_token_empty_returns_none() {
2412 use axum::{Json, Router, routing::get};
2413
2414 let app = Router::new().route(
2415 "/api/v1/auth/token",
2416 get(|| async {
2417 Json(AuthTokenResponse {
2418 token: String::new(),
2419 })
2420 }),
2421 );
2422 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2423 let addr = listener.local_addr().unwrap();
2424 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2425
2426 let base = format!("http://127.0.0.1:{}", addr.port());
2427 let client = reqwest::Client::new();
2428 assert_eq!(discover_token(&client, &base).await, None);
2429 }
2430
2431 #[tokio::test]
2432 async fn test_discover_token_unreachable_returns_none() {
2433 let client = reqwest::Client::new();
2434 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2435 }
2436
2437 #[test]
2438 fn test_cli_parse_with_token() {
2439 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2440 assert_eq!(cli.token, Some("my-secret".into()));
2441 }
2442
2443 #[test]
2444 fn test_cli_parse_without_token() {
2445 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2446 assert_eq!(cli.token, None);
2447 }
2448
2449 #[tokio::test]
2450 async fn test_execute_with_explicit_token_sends_header() {
2451 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2452
2453 let app = Router::new().route(
2454 "/api/v1/sessions",
2455 get(|req: Request| async move {
2456 let auth = req
2457 .headers()
2458 .get("authorization")
2459 .and_then(|v| v.to_str().ok())
2460 .unwrap_or("");
2461 assert_eq!(auth, "Bearer test-token");
2462 (StatusCode::OK, "[]".to_owned())
2463 }),
2464 );
2465 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2466 let addr = listener.local_addr().unwrap();
2467 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2468 let node = format!("127.0.0.1:{}", addr.port());
2469
2470 let cli = Cli {
2471 node,
2472 token: Some("test-token".into()),
2473 command: Commands::List,
2474 };
2475 let result = execute(&cli).await.unwrap();
2476 assert_eq!(result, "No sessions.");
2477 }
2478
2479 #[test]
2482 fn test_cli_parse_interventions() {
2483 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2484 assert!(matches!(
2485 &cli.command,
2486 Commands::Interventions { name } if name == "my-session"
2487 ));
2488 }
2489
2490 #[test]
2491 fn test_format_interventions_empty() {
2492 assert_eq!(format_interventions(&[]), "No intervention events.");
2493 }
2494
2495 #[test]
2496 fn test_format_interventions_with_data() {
2497 let events = vec![
2498 InterventionEventResponse {
2499 id: 1,
2500 session_id: "sess-1".into(),
2501 reason: "Memory exceeded threshold".into(),
2502 created_at: "2026-01-01T00:00:00Z".into(),
2503 },
2504 InterventionEventResponse {
2505 id: 2,
2506 session_id: "sess-1".into(),
2507 reason: "Idle for 10 minutes".into(),
2508 created_at: "2026-01-02T00:00:00Z".into(),
2509 },
2510 ];
2511 let output = format_interventions(&events);
2512 assert!(output.contains("ID"));
2513 assert!(output.contains("TIMESTAMP"));
2514 assert!(output.contains("REASON"));
2515 assert!(output.contains("Memory exceeded threshold"));
2516 assert!(output.contains("Idle for 10 minutes"));
2517 assert!(output.contains("2026-01-01T00:00:00Z"));
2518 }
2519
2520 #[tokio::test]
2521 async fn test_execute_interventions_empty() {
2522 let node = start_test_server().await;
2523 let cli = Cli {
2524 node,
2525 token: None,
2526 command: Commands::Interventions {
2527 name: "my-session".into(),
2528 },
2529 };
2530 let result = execute(&cli).await.unwrap();
2531 assert_eq!(result, "No intervention events.");
2532 }
2533
2534 #[tokio::test]
2535 async fn test_execute_interventions_with_data() {
2536 use axum::{Router, routing::get};
2537
2538 let app = Router::new().route(
2539 "/api/v1/sessions/{id}/interventions",
2540 get(|| async {
2541 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2542 .to_owned()
2543 }),
2544 );
2545 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2546 let addr = listener.local_addr().unwrap();
2547 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2548 let node = format!("127.0.0.1:{}", addr.port());
2549
2550 let cli = Cli {
2551 node,
2552 token: None,
2553 command: Commands::Interventions {
2554 name: "test".into(),
2555 },
2556 };
2557 let result = execute(&cli).await.unwrap();
2558 assert!(result.contains("OOM"));
2559 assert!(result.contains("2026-01-01T00:00:00Z"));
2560 }
2561
2562 #[tokio::test]
2563 async fn test_execute_interventions_connection_refused() {
2564 let cli = Cli {
2565 node: "localhost:1".into(),
2566 token: None,
2567 command: Commands::Interventions {
2568 name: "test".into(),
2569 },
2570 };
2571 let result = execute(&cli).await;
2572 let err = result.unwrap_err().to_string();
2573 assert!(err.contains("Could not connect to pulpod"));
2574 }
2575
2576 #[test]
2579 fn test_build_attach_command() {
2580 let cmd = build_attach_command("pulpo-my-session");
2581 assert_eq!(cmd.get_program(), "tmux");
2582 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2583 assert_eq!(args, vec!["attach-session", "-t", "pulpo-my-session"]);
2584 }
2585
2586 #[test]
2587 fn test_cli_parse_attach() {
2588 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2589 assert!(matches!(
2590 &cli.command,
2591 Commands::Attach { name } if name == "my-session"
2592 ));
2593 }
2594
2595 #[test]
2596 fn test_cli_parse_attach_alias() {
2597 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2598 assert!(matches!(
2599 &cli.command,
2600 Commands::Attach { name } if name == "my-session"
2601 ));
2602 }
2603
2604 #[tokio::test]
2605 async fn test_execute_attach_success() {
2606 let node = start_test_server().await;
2607 let cli = Cli {
2608 node,
2609 token: None,
2610 command: Commands::Attach {
2611 name: "test-session".into(),
2612 },
2613 };
2614 let result = execute(&cli).await.unwrap();
2615 assert!(result.contains("Detached from session test-session"));
2616 }
2617
2618 #[tokio::test]
2619 async fn test_execute_attach_with_backend_session_id() {
2620 use axum::{Router, routing::get};
2621 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"pulpo-my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2622 let app = Router::new().route(
2623 "/api/v1/sessions/{id}",
2624 get(move || async move { session_json.to_owned() }),
2625 );
2626 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2627 let addr = listener.local_addr().unwrap();
2628 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2629
2630 let cli = Cli {
2631 node: format!("127.0.0.1:{}", addr.port()),
2632 token: None,
2633 command: Commands::Attach {
2634 name: "my-session".into(),
2635 },
2636 };
2637 let result = execute(&cli).await.unwrap();
2638 assert!(result.contains("Detached from session my-session"));
2639 }
2640
2641 #[tokio::test]
2642 async fn test_execute_attach_connection_refused() {
2643 let cli = Cli {
2644 node: "localhost:1".into(),
2645 token: None,
2646 command: Commands::Attach {
2647 name: "test-session".into(),
2648 },
2649 };
2650 let result = execute(&cli).await;
2651 let err = result.unwrap_err().to_string();
2652 assert!(err.contains("Could not connect to pulpod"));
2653 }
2654
2655 #[tokio::test]
2656 async fn test_execute_attach_error_response() {
2657 use axum::{Router, http::StatusCode, routing::get};
2658 let app = Router::new().route(
2659 "/api/v1/sessions/{id}",
2660 get(|| async {
2661 (
2662 StatusCode::NOT_FOUND,
2663 r#"{"error":"session not found"}"#.to_owned(),
2664 )
2665 }),
2666 );
2667 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2668 let addr = listener.local_addr().unwrap();
2669 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2670
2671 let cli = Cli {
2672 node: format!("127.0.0.1:{}", addr.port()),
2673 token: None,
2674 command: Commands::Attach {
2675 name: "nonexistent".into(),
2676 },
2677 };
2678 let result = execute(&cli).await;
2679 let err = result.unwrap_err().to_string();
2680 assert!(err.contains("session not found"));
2681 }
2682
2683 #[test]
2686 fn test_cli_parse_alias_spawn() {
2687 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2688 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2689 }
2690
2691 #[test]
2692 fn test_cli_parse_alias_list() {
2693 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2694 assert!(matches!(cli.command, Commands::List));
2695 }
2696
2697 #[test]
2698 fn test_cli_parse_alias_logs() {
2699 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2700 assert!(matches!(
2701 &cli.command,
2702 Commands::Logs { name, .. } if name == "my-session"
2703 ));
2704 }
2705
2706 #[test]
2707 fn test_cli_parse_alias_kill() {
2708 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2709 assert!(matches!(
2710 &cli.command,
2711 Commands::Kill { name } if name == "my-session"
2712 ));
2713 }
2714
2715 #[test]
2716 fn test_cli_parse_alias_delete() {
2717 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2718 assert!(matches!(
2719 &cli.command,
2720 Commands::Delete { name } if name == "my-session"
2721 ));
2722 }
2723
2724 #[test]
2725 fn test_cli_parse_alias_resume() {
2726 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2727 assert!(matches!(
2728 &cli.command,
2729 Commands::Resume { name } if name == "my-session"
2730 ));
2731 }
2732
2733 #[test]
2734 fn test_cli_parse_alias_nodes() {
2735 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2736 assert!(matches!(cli.command, Commands::Nodes));
2737 }
2738
2739 #[test]
2740 fn test_cli_parse_alias_interventions() {
2741 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2742 assert!(matches!(
2743 &cli.command,
2744 Commands::Interventions { name } if name == "my-session"
2745 ));
2746 }
2747
2748 #[test]
2749 fn test_api_error_json() {
2750 let err = api_error("{\"error\":\"session not found: foo\"}");
2751 assert_eq!(err.to_string(), "session not found: foo");
2752 }
2753
2754 #[test]
2755 fn test_api_error_plain_text() {
2756 let err = api_error("plain text error");
2757 assert_eq!(err.to_string(), "plain text error");
2758 }
2759
2760 #[test]
2763 fn test_diff_output_empty_prev() {
2764 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2765 }
2766
2767 #[test]
2768 fn test_diff_output_identical() {
2769 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2770 }
2771
2772 #[test]
2773 fn test_diff_output_new_lines_appended() {
2774 let prev = "line1\nline2";
2775 let new = "line1\nline2\nline3\nline4";
2776 assert_eq!(diff_output(prev, new), "line3\nline4");
2777 }
2778
2779 #[test]
2780 fn test_diff_output_scrolled_window() {
2781 let prev = "line1\nline2\nline3";
2783 let new = "line2\nline3\nline4";
2784 assert_eq!(diff_output(prev, new), "line4");
2785 }
2786
2787 #[test]
2788 fn test_diff_output_completely_different() {
2789 let prev = "aaa\nbbb";
2790 let new = "xxx\nyyy";
2791 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2792 }
2793
2794 #[test]
2795 fn test_diff_output_last_line_matches_but_overlap_fails() {
2796 let prev = "aaa\ncommon";
2798 let new = "zzz\ncommon\nnew_line";
2799 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2803 }
2804
2805 #[test]
2806 fn test_diff_output_new_empty() {
2807 assert_eq!(diff_output("line1", ""), "");
2808 }
2809
2810 async fn start_follow_test_server() -> String {
2814 use axum::{Router, extract::Path, extract::Query, routing::get};
2815 use std::sync::Arc;
2816 use std::sync::atomic::{AtomicUsize, Ordering};
2817
2818 let call_count = Arc::new(AtomicUsize::new(0));
2819 let output_count = call_count.clone();
2820 let status_count = Arc::new(AtomicUsize::new(0));
2821 let status_count_inner = status_count.clone();
2822
2823 let app = Router::new()
2824 .route(
2825 "/api/v1/sessions/{id}/output",
2826 get(
2827 move |_path: Path<String>,
2828 _query: Query<std::collections::HashMap<String, String>>| {
2829 let count = output_count.clone();
2830 async move {
2831 let n = count.fetch_add(1, Ordering::SeqCst);
2832 let output = match n {
2833 0 => "line1\nline2".to_owned(),
2834 1 => "line1\nline2\nline3".to_owned(),
2835 _ => "line2\nline3\nline4".to_owned(),
2836 };
2837 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2838 }
2839 },
2840 ),
2841 )
2842 .route(
2843 "/api/v1/sessions/{id}",
2844 get(move |_path: Path<String>| {
2845 let count = status_count_inner.clone();
2846 async move {
2847 let n = count.fetch_add(1, Ordering::SeqCst);
2848 let status = if n < 2 { "running" } else { "completed" };
2849 format!(
2850 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2851 )
2852 }
2853 }),
2854 );
2855
2856 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2857 let addr = listener.local_addr().unwrap();
2858 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2859 format!("http://127.0.0.1:{}", addr.port())
2860 }
2861
2862 #[tokio::test]
2863 async fn test_follow_logs_polls_and_exits_on_completed() {
2864 let base = start_follow_test_server().await;
2865 let client = reqwest::Client::new();
2866 let mut buf = Vec::new();
2867
2868 follow_logs(&client, &base, "test", 100, None, &mut buf)
2869 .await
2870 .unwrap();
2871
2872 let output = String::from_utf8(buf).unwrap();
2873 assert!(output.contains("line1"));
2875 assert!(output.contains("line2"));
2876 assert!(output.contains("line3"));
2877 assert!(output.contains("line4"));
2878 }
2879
2880 #[tokio::test]
2881 async fn test_execute_logs_follow_success() {
2882 let base = start_follow_test_server().await;
2883 let node = base.strip_prefix("http://").unwrap().to_owned();
2885
2886 let cli = Cli {
2887 node,
2888 token: None,
2889 command: Commands::Logs {
2890 name: "test".into(),
2891 lines: 100,
2892 follow: true,
2893 },
2894 };
2895 let result = execute(&cli).await.unwrap();
2897 assert_eq!(result, "");
2898 }
2899
2900 #[tokio::test]
2901 async fn test_execute_logs_follow_connection_refused() {
2902 let cli = Cli {
2903 node: "localhost:1".into(),
2904 token: None,
2905 command: Commands::Logs {
2906 name: "test".into(),
2907 lines: 50,
2908 follow: true,
2909 },
2910 };
2911 let result = execute(&cli).await;
2912 let err = result.unwrap_err().to_string();
2913 assert!(
2914 err.contains("Could not connect to pulpod"),
2915 "Expected friendly error, got: {err}"
2916 );
2917 }
2918
2919 #[tokio::test]
2920 async fn test_follow_logs_exits_on_dead() {
2921 use axum::{Router, extract::Path, extract::Query, routing::get};
2922
2923 let app = Router::new()
2924 .route(
2925 "/api/v1/sessions/{id}/output",
2926 get(
2927 |_path: Path<String>,
2928 _query: Query<std::collections::HashMap<String, String>>| async {
2929 r#"{"output":"some output"}"#.to_owned()
2930 },
2931 ),
2932 )
2933 .route(
2934 "/api/v1/sessions/{id}",
2935 get(|_path: Path<String>| async {
2936 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2937 }),
2938 );
2939
2940 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2941 let addr = listener.local_addr().unwrap();
2942 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2943 let base = format!("http://127.0.0.1:{}", addr.port());
2944
2945 let client = reqwest::Client::new();
2946 let mut buf = Vec::new();
2947 follow_logs(&client, &base, "test", 100, None, &mut buf)
2948 .await
2949 .unwrap();
2950
2951 let output = String::from_utf8(buf).unwrap();
2952 assert!(output.contains("some output"));
2953 }
2954
2955 #[tokio::test]
2956 async fn test_follow_logs_exits_on_stale() {
2957 use axum::{Router, extract::Path, extract::Query, routing::get};
2958
2959 let app = Router::new()
2960 .route(
2961 "/api/v1/sessions/{id}/output",
2962 get(
2963 |_path: Path<String>,
2964 _query: Query<std::collections::HashMap<String, String>>| async {
2965 r#"{"output":"stale output"}"#.to_owned()
2966 },
2967 ),
2968 )
2969 .route(
2970 "/api/v1/sessions/{id}",
2971 get(|_path: Path<String>| async {
2972 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2973 }),
2974 );
2975
2976 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2977 let addr = listener.local_addr().unwrap();
2978 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2979 let base = format!("http://127.0.0.1:{}", addr.port());
2980
2981 let client = reqwest::Client::new();
2982 let mut buf = Vec::new();
2983 follow_logs(&client, &base, "test", 100, None, &mut buf)
2984 .await
2985 .unwrap();
2986
2987 let output = String::from_utf8(buf).unwrap();
2988 assert!(output.contains("stale output"));
2989 }
2990
2991 #[tokio::test]
2992 async fn test_execute_logs_follow_non_reqwest_error() {
2993 use axum::{Router, extract::Path, extract::Query, routing::get};
2994
2995 let app = Router::new()
2997 .route(
2998 "/api/v1/sessions/{id}/output",
2999 get(
3000 |_path: Path<String>,
3001 _query: Query<std::collections::HashMap<String, String>>| async {
3002 r#"{"output":"initial"}"#.to_owned()
3003 },
3004 ),
3005 )
3006 .route(
3007 "/api/v1/sessions/{id}",
3008 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3009 );
3010
3011 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3012 let addr = listener.local_addr().unwrap();
3013 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3014 let node = format!("127.0.0.1:{}", addr.port());
3015
3016 let cli = Cli {
3017 node,
3018 token: None,
3019 command: Commands::Logs {
3020 name: "test".into(),
3021 lines: 100,
3022 follow: true,
3023 },
3024 };
3025 let err = execute(&cli).await.unwrap_err();
3026 let msg = err.to_string();
3028 assert!(
3029 msg.contains("expected ident"),
3030 "Expected serde parse error, got: {msg}"
3031 );
3032 }
3033
3034 #[test]
3035 fn test_cli_parse_spawn_with_guardrails() {
3036 let cli = Cli::try_parse_from([
3037 "pulpo",
3038 "spawn",
3039 "--workdir",
3040 "/tmp",
3041 "--max-turns",
3042 "10",
3043 "--max-budget",
3044 "5.5",
3045 "--output-format",
3046 "json",
3047 "Do it",
3048 ])
3049 .unwrap();
3050 assert!(matches!(
3051 &cli.command,
3052 Commands::Spawn { max_turns, max_budget, output_format, .. }
3053 if *max_turns == Some(10) && *max_budget == Some(5.5)
3054 && output_format.as_deref() == Some("json")
3055 ));
3056 }
3057
3058 #[tokio::test]
3059 async fn test_fetch_session_status_connection_error() {
3060 let client = reqwest::Client::new();
3061 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3062 assert!(result.is_err());
3063 }
3064
3065 #[test]
3068 fn test_build_crontab_line() {
3069 let line = build_crontab_line(
3070 "nightly-review",
3071 "0 3 * * *",
3072 "/home/me/repo",
3073 "claude",
3074 "Review PRs",
3075 "localhost:7433",
3076 );
3077 assert_eq!(
3078 line,
3079 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3080 );
3081 }
3082
3083 #[test]
3084 fn test_crontab_install_success() {
3085 let crontab = "# existing cron\n0 * * * * echo hi\n";
3086 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3087 let result = crontab_install(crontab, "my-job", line).unwrap();
3088 assert!(result.starts_with("# existing cron\n"));
3089 assert!(result.ends_with("#pulpo:my-job\n"));
3090 assert!(result.contains("echo hi"));
3091 }
3092
3093 #[test]
3094 fn test_crontab_install_duplicate_error() {
3095 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3096 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3097 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3098 assert!(err.to_string().contains("already exists"));
3099 }
3100
3101 #[test]
3102 fn test_crontab_list_empty() {
3103 assert_eq!(crontab_list(""), "No pulpo schedules.");
3104 }
3105
3106 #[test]
3107 fn test_crontab_list_no_pulpo_entries() {
3108 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3109 }
3110
3111 #[test]
3112 fn test_crontab_list_with_entries() {
3113 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3114 let output = crontab_list(crontab);
3115 assert!(output.contains("NAME"));
3116 assert!(output.contains("CRON"));
3117 assert!(output.contains("PAUSED"));
3118 assert!(output.contains("nightly"));
3119 assert!(output.contains("0 3 * * *"));
3120 assert!(output.contains("no"));
3121 }
3122
3123 #[test]
3124 fn test_crontab_list_paused_entry() {
3125 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3126 let output = crontab_list(crontab);
3127 assert!(output.contains("paused-job"));
3128 assert!(output.contains("yes"));
3129 }
3130
3131 #[test]
3132 fn test_crontab_list_short_line() {
3133 let crontab = "badcron #pulpo:broken\n";
3135 let output = crontab_list(crontab);
3136 assert!(output.contains("broken"));
3137 assert!(output.contains('?'));
3138 }
3139
3140 #[test]
3141 fn test_crontab_remove_success() {
3142 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3143 let result = crontab_remove(crontab, "my-job").unwrap();
3144 assert!(result.contains("echo hi"));
3145 assert!(!result.contains("my-job"));
3146 }
3147
3148 #[test]
3149 fn test_crontab_remove_not_found() {
3150 let crontab = "0 * * * * echo hi\n";
3151 let err = crontab_remove(crontab, "ghost").unwrap_err();
3152 assert!(err.to_string().contains("not found"));
3153 }
3154
3155 #[test]
3156 fn test_crontab_pause_success() {
3157 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3158 let result = crontab_pause(crontab, "my-job").unwrap();
3159 assert!(result.starts_with('#'));
3160 assert!(result.contains("#pulpo:my-job"));
3161 }
3162
3163 #[test]
3164 fn test_crontab_pause_not_found() {
3165 let crontab = "0 * * * * echo hi\n";
3166 let err = crontab_pause(crontab, "ghost").unwrap_err();
3167 assert!(err.to_string().contains("not found or already paused"));
3168 }
3169
3170 #[test]
3171 fn test_crontab_pause_already_paused() {
3172 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3173 let err = crontab_pause(crontab, "my-job").unwrap_err();
3174 assert!(err.to_string().contains("already paused"));
3175 }
3176
3177 #[test]
3178 fn test_crontab_resume_success() {
3179 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3180 let result = crontab_resume(crontab, "my-job").unwrap();
3181 assert!(!result.starts_with('#'));
3182 assert!(result.contains("#pulpo:my-job"));
3183 }
3184
3185 #[test]
3186 fn test_crontab_resume_not_found() {
3187 let crontab = "0 * * * * echo hi\n";
3188 let err = crontab_resume(crontab, "ghost").unwrap_err();
3189 assert!(err.to_string().contains("not found or not paused"));
3190 }
3191
3192 #[test]
3193 fn test_crontab_resume_not_paused() {
3194 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3195 let err = crontab_resume(crontab, "my-job").unwrap_err();
3196 assert!(err.to_string().contains("not paused"));
3197 }
3198
3199 #[test]
3202 fn test_cli_parse_schedule_install() {
3203 let cli = Cli::try_parse_from([
3204 "pulpo",
3205 "schedule",
3206 "install",
3207 "nightly",
3208 "0 3 * * *",
3209 "--workdir",
3210 "/tmp/repo",
3211 "Review",
3212 "PRs",
3213 ])
3214 .unwrap();
3215 assert!(matches!(
3216 &cli.command,
3217 Commands::Schedule {
3218 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3219 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3220 && provider == "claude" && prompt == &["Review", "PRs"]
3221 ));
3222 }
3223
3224 #[test]
3225 fn test_cli_parse_schedule_list() {
3226 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3227 assert!(matches!(
3228 &cli.command,
3229 Commands::Schedule {
3230 action: ScheduleAction::List
3231 }
3232 ));
3233 }
3234
3235 #[test]
3236 fn test_cli_parse_schedule_remove() {
3237 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3238 assert!(matches!(
3239 &cli.command,
3240 Commands::Schedule {
3241 action: ScheduleAction::Remove { name }
3242 } if name == "nightly"
3243 ));
3244 }
3245
3246 #[test]
3247 fn test_cli_parse_schedule_pause() {
3248 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3249 assert!(matches!(
3250 &cli.command,
3251 Commands::Schedule {
3252 action: ScheduleAction::Pause { name }
3253 } if name == "nightly"
3254 ));
3255 }
3256
3257 #[test]
3258 fn test_cli_parse_schedule_resume() {
3259 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3260 assert!(matches!(
3261 &cli.command,
3262 Commands::Schedule {
3263 action: ScheduleAction::Resume { name }
3264 } if name == "nightly"
3265 ));
3266 }
3267
3268 #[test]
3269 fn test_cli_parse_schedule_alias() {
3270 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3271 assert!(matches!(
3272 &cli.command,
3273 Commands::Schedule {
3274 action: ScheduleAction::List
3275 }
3276 ));
3277 }
3278
3279 #[test]
3280 fn test_cli_parse_schedule_list_alias() {
3281 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3282 assert!(matches!(
3283 &cli.command,
3284 Commands::Schedule {
3285 action: ScheduleAction::List
3286 }
3287 ));
3288 }
3289
3290 #[test]
3291 fn test_cli_parse_schedule_remove_alias() {
3292 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3293 assert!(matches!(
3294 &cli.command,
3295 Commands::Schedule {
3296 action: ScheduleAction::Remove { name }
3297 } if name == "nightly"
3298 ));
3299 }
3300
3301 #[test]
3302 fn test_cli_parse_schedule_install_custom_provider() {
3303 let cli = Cli::try_parse_from([
3304 "pulpo",
3305 "schedule",
3306 "install",
3307 "daily",
3308 "0 9 * * *",
3309 "--workdir",
3310 "/tmp",
3311 "--provider",
3312 "codex",
3313 "Run tests",
3314 ])
3315 .unwrap();
3316 assert!(matches!(
3317 &cli.command,
3318 Commands::Schedule {
3319 action: ScheduleAction::Install { provider, .. }
3320 } if provider == "codex"
3321 ));
3322 }
3323
3324 #[tokio::test]
3325 async fn test_execute_schedule_via_execute() {
3326 let node = start_test_server().await;
3328 let cli = Cli {
3329 node,
3330 token: None,
3331 command: Commands::Schedule {
3332 action: ScheduleAction::List,
3333 },
3334 };
3335 let result = execute(&cli).await;
3336 assert!(result.is_ok() || result.is_err());
3338 }
3339
3340 #[test]
3341 fn test_schedule_action_debug() {
3342 let action = ScheduleAction::List;
3343 assert_eq!(format!("{action:?}"), "List");
3344 }
3345
3346 #[test]
3349 fn test_cli_parse_knowledge() {
3350 let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3351 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3352 }
3353
3354 #[test]
3355 fn test_cli_parse_knowledge_alias() {
3356 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3357 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3358 }
3359
3360 #[test]
3361 fn test_cli_parse_knowledge_with_filters() {
3362 let cli = Cli::try_parse_from([
3363 "pulpo",
3364 "knowledge",
3365 "--kind",
3366 "failure",
3367 "--repo",
3368 "/tmp/repo",
3369 "--ink",
3370 "coder",
3371 "--limit",
3372 "5",
3373 ])
3374 .unwrap();
3375 match &cli.command {
3376 Commands::Knowledge {
3377 kind,
3378 repo,
3379 ink,
3380 limit,
3381 ..
3382 } => {
3383 assert_eq!(kind.as_deref(), Some("failure"));
3384 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3385 assert_eq!(ink.as_deref(), Some("coder"));
3386 assert_eq!(*limit, 5);
3387 }
3388 _ => panic!("expected Knowledge command"),
3389 }
3390 }
3391
3392 #[test]
3393 fn test_cli_parse_knowledge_context() {
3394 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3395 .unwrap();
3396 match &cli.command {
3397 Commands::Knowledge { context, repo, .. } => {
3398 assert!(*context);
3399 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3400 }
3401 _ => panic!("expected Knowledge command"),
3402 }
3403 }
3404
3405 #[test]
3406 fn test_format_knowledge_empty() {
3407 assert_eq!(format_knowledge(&[]), "No knowledge found.");
3408 }
3409
3410 #[test]
3411 fn test_format_knowledge_items() {
3412 use chrono::Utc;
3413 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3414 use uuid::Uuid;
3415
3416 let items = vec![
3417 Knowledge {
3418 id: Uuid::new_v4(),
3419 session_id: Uuid::new_v4(),
3420 kind: KnowledgeKind::Summary,
3421 scope_repo: Some("/tmp/repo".into()),
3422 scope_ink: Some("coder".into()),
3423 title: "Fixed the auth bug".into(),
3424 body: "Details".into(),
3425 tags: vec!["claude".into(), "completed".into()],
3426 relevance: 0.7,
3427 created_at: Utc::now(),
3428 },
3429 Knowledge {
3430 id: Uuid::new_v4(),
3431 session_id: Uuid::new_v4(),
3432 kind: KnowledgeKind::Failure,
3433 scope_repo: None,
3434 scope_ink: None,
3435 title: "OOM crash during build".into(),
3436 body: "Details".into(),
3437 tags: vec!["failure".into()],
3438 relevance: 0.9,
3439 created_at: Utc::now(),
3440 },
3441 ];
3442
3443 let output = format_knowledge(&items);
3444 assert!(output.contains("KIND"));
3445 assert!(output.contains("TITLE"));
3446 assert!(output.contains("summary"));
3447 assert!(output.contains("failure"));
3448 assert!(output.contains("Fixed the auth bug"));
3449 assert!(output.contains("repo"));
3450 assert!(output.contains("0.70"));
3451 }
3452
3453 #[test]
3454 fn test_format_knowledge_long_title_truncated() {
3455 use chrono::Utc;
3456 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3457 use uuid::Uuid;
3458
3459 let items = vec![Knowledge {
3460 id: Uuid::new_v4(),
3461 session_id: Uuid::new_v4(),
3462 kind: KnowledgeKind::Summary,
3463 scope_repo: Some("/repo".into()),
3464 scope_ink: None,
3465 title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3466 body: "Body".into(),
3467 tags: vec![],
3468 relevance: 0.5,
3469 created_at: Utc::now(),
3470 }];
3471
3472 let output = format_knowledge(&items);
3473 assert!(output.contains('…'));
3474 }
3475
3476 #[test]
3477 fn test_cli_parse_knowledge_get() {
3478 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3479 match &cli.command {
3480 Commands::Knowledge { get, .. } => {
3481 assert_eq!(get.as_deref(), Some("abc-123"));
3482 }
3483 _ => panic!("expected Knowledge command"),
3484 }
3485 }
3486
3487 #[test]
3488 fn test_cli_parse_knowledge_delete() {
3489 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3490 match &cli.command {
3491 Commands::Knowledge { delete, .. } => {
3492 assert_eq!(delete.as_deref(), Some("abc-123"));
3493 }
3494 _ => panic!("expected Knowledge command"),
3495 }
3496 }
3497
3498 #[test]
3499 fn test_cli_parse_knowledge_push() {
3500 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3501 match &cli.command {
3502 Commands::Knowledge { push, .. } => {
3503 assert!(*push);
3504 }
3505 _ => panic!("expected Knowledge command"),
3506 }
3507 }
3508
3509 #[test]
3510 fn test_format_knowledge_no_repo() {
3511 use chrono::Utc;
3512 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3513 use uuid::Uuid;
3514
3515 let items = vec![Knowledge {
3516 id: Uuid::new_v4(),
3517 session_id: Uuid::new_v4(),
3518 kind: KnowledgeKind::Summary,
3519 scope_repo: None,
3520 scope_ink: None,
3521 title: "Global finding".into(),
3522 body: "Body".into(),
3523 tags: vec![],
3524 relevance: 0.5,
3525 created_at: Utc::now(),
3526 }];
3527
3528 let output = format_knowledge(&items);
3529 assert!(output.contains('-')); }
3531}