1use anyhow::Result;
2use clap::{Parser, Subcommand};
3use pulpo_common::api::{
4 AuthTokenResponse, CreateSessionResponse, InterventionEventResponse, KnowledgeDeleteResponse,
5 KnowledgeItemResponse, KnowledgePushResponse, KnowledgeResponse, PeersResponse,
6};
7use pulpo_common::knowledge::Knowledge;
8use pulpo_common::session::Session;
9
10#[derive(Parser, Debug)]
11#[command(
12 name = "pulpo",
13 about = "Manage agent sessions across your machines",
14 version
15)]
16pub struct Cli {
17 #[arg(long, default_value = "localhost:7433")]
19 pub node: String,
20
21 #[arg(long)]
23 pub token: Option<String>,
24
25 #[command(subcommand)]
26 pub command: Commands,
27}
28
29#[derive(Subcommand, Debug)]
30#[allow(clippy::large_enum_variant)]
31pub enum Commands {
32 #[command(visible_alias = "a")]
34 Attach {
35 name: String,
37 },
38
39 #[command(visible_alias = "i")]
41 Input {
42 name: String,
44 text: Option<String>,
46 },
47
48 #[command(visible_alias = "s")]
50 Spawn {
51 #[arg(long)]
53 workdir: Option<String>,
54
55 #[arg(long)]
57 name: Option<String>,
58
59 #[arg(long)]
61 provider: Option<String>,
62
63 #[arg(long)]
65 auto: bool,
66
67 #[arg(long)]
69 unrestricted: bool,
70
71 #[arg(long)]
73 model: Option<String>,
74
75 #[arg(long)]
77 system_prompt: Option<String>,
78
79 #[arg(long, value_delimiter = ',')]
81 allowed_tools: Option<Vec<String>>,
82
83 #[arg(long)]
85 ink: Option<String>,
86
87 #[arg(long)]
89 max_turns: Option<u32>,
90
91 #[arg(long)]
93 max_budget: Option<f64>,
94
95 #[arg(long)]
97 output_format: Option<String>,
98
99 prompt: Vec<String>,
101 },
102
103 #[command(visible_alias = "ls")]
105 List,
106
107 #[command(visible_alias = "l")]
109 Logs {
110 name: String,
112
113 #[arg(long, default_value = "100")]
115 lines: usize,
116
117 #[arg(short, long)]
119 follow: bool,
120 },
121
122 #[command(visible_alias = "k")]
124 Kill {
125 name: String,
127 },
128
129 #[command(visible_alias = "rm")]
131 Delete {
132 name: String,
134 },
135
136 #[command(visible_alias = "r")]
138 Resume {
139 name: String,
141 },
142
143 #[command(visible_alias = "n")]
145 Nodes,
146
147 #[command(visible_alias = "iv")]
149 Interventions {
150 name: String,
152 },
153
154 Ui,
156
157 #[command(visible_alias = "kn")]
159 Knowledge {
160 #[arg(long)]
162 session: Option<String>,
163
164 #[arg(long)]
166 kind: Option<String>,
167
168 #[arg(long)]
170 repo: Option<String>,
171
172 #[arg(long)]
174 ink: Option<String>,
175
176 #[arg(long, default_value = "20")]
178 limit: usize,
179
180 #[arg(long)]
182 context: bool,
183
184 #[arg(long)]
186 get: Option<String>,
187
188 #[arg(long)]
190 delete: Option<String>,
191
192 #[arg(long)]
194 push: bool,
195 },
196
197 #[command(visible_alias = "sched")]
199 Schedule {
200 #[command(subcommand)]
201 action: ScheduleAction,
202 },
203}
204
205#[derive(Subcommand, Debug)]
206pub enum ScheduleAction {
207 Install {
209 name: String,
211 cron: String,
213 #[arg(long)]
215 workdir: String,
216 #[arg(long, default_value = "claude")]
218 provider: String,
219 prompt: Vec<String>,
221 },
222 #[command(alias = "ls")]
224 List,
225 #[command(alias = "rm")]
227 Remove {
228 name: String,
230 },
231 Pause {
233 name: String,
235 },
236 Resume {
238 name: String,
240 },
241}
242
243pub fn base_url(node: &str) -> String {
245 format!("http://{node}")
246}
247
248#[derive(serde::Deserialize)]
250struct OutputResponse {
251 output: String,
252}
253
254fn format_sessions(sessions: &[Session]) -> String {
256 if sessions.is_empty() {
257 return "No sessions.".into();
258 }
259 let mut lines = vec![format!(
260 "{:<20} {:<12} {:<10} {:<14} {}",
261 "NAME", "STATUS", "PROVIDER", "MODE", "PROMPT"
262 )];
263 for s in sessions {
264 let prompt_display = if s.prompt.len() > 40 {
265 format!("{}...", &s.prompt[..37])
266 } else {
267 s.prompt.clone()
268 };
269 let status_display = if s.waiting_for_input {
270 "waiting".to_owned()
271 } else {
272 s.status.to_string()
273 };
274 lines.push(format!(
275 "{:<20} {:<12} {:<10} {:<14} {}",
276 s.name, status_display, s.provider, s.mode, prompt_display
277 ));
278 }
279 lines.join("\n")
280}
281
282fn format_nodes(resp: &PeersResponse) -> String {
284 let mut lines = vec![format!(
285 "{:<20} {:<25} {:<10} {}",
286 "NAME", "ADDRESS", "STATUS", "SESSIONS"
287 )];
288 lines.push(format!(
289 "{:<20} {:<25} {:<10} {}",
290 resp.local.name, "(local)", "online", "-"
291 ));
292 for p in &resp.peers {
293 let sessions = p
294 .session_count
295 .map_or_else(|| "-".into(), |c| c.to_string());
296 lines.push(format!(
297 "{:<20} {:<25} {:<10} {}",
298 p.name, p.address, p.status, sessions
299 ));
300 }
301 lines.join("\n")
302}
303
304fn format_interventions(events: &[InterventionEventResponse]) -> String {
306 if events.is_empty() {
307 return "No intervention events.".into();
308 }
309 let mut lines = vec![format!("{:<8} {:<20} {}", "ID", "TIMESTAMP", "REASON")];
310 for e in events {
311 lines.push(format!("{:<8} {:<20} {}", e.id, e.created_at, e.reason));
312 }
313 lines.join("\n")
314}
315
316fn format_knowledge(items: &[Knowledge]) -> String {
318 if items.is_empty() {
319 return "No knowledge found.".into();
320 }
321 let mut lines = vec![format!(
322 "{:<10} {:<40} {:<10} {:<6} {}",
323 "KIND", "TITLE", "REPO", "REL", "TAGS"
324 )];
325 for k in items {
326 let title = if k.title.len() > 38 {
327 format!("{}…", &k.title[..37])
328 } else {
329 k.title.clone()
330 };
331 let repo = k
332 .scope_repo
333 .as_deref()
334 .and_then(|r| r.rsplit('/').next())
335 .unwrap_or("-");
336 let tags = k.tags.join(",");
337 lines.push(format!(
338 "{:<10} {:<40} {:<10} {:<6.2} {}",
339 k.kind, title, repo, k.relevance, tags
340 ));
341 }
342 lines.join("\n")
343}
344
345#[cfg_attr(coverage, allow(dead_code))]
347fn build_open_command(url: &str) -> std::process::Command {
348 #[cfg(target_os = "macos")]
349 {
350 let mut cmd = std::process::Command::new("open");
351 cmd.arg(url);
352 cmd
353 }
354 #[cfg(target_os = "linux")]
355 {
356 let mut cmd = std::process::Command::new("xdg-open");
357 cmd.arg(url);
358 cmd
359 }
360 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
361 {
362 let mut cmd = std::process::Command::new("xdg-open");
364 cmd.arg(url);
365 cmd
366 }
367}
368
369#[cfg(not(coverage))]
371fn open_browser(url: &str) -> Result<()> {
372 build_open_command(url).status()?;
373 Ok(())
374}
375
376#[cfg(coverage)]
378fn open_browser(_url: &str) -> Result<()> {
379 Ok(())
380}
381
382#[cfg_attr(coverage, allow(dead_code))]
385fn build_attach_command(backend_session_id: &str) -> std::process::Command {
386 let mut cmd = std::process::Command::new("tmux");
387 cmd.args(["attach-session", "-t", backend_session_id]);
388 cmd
389}
390
391#[cfg(not(any(test, coverage)))]
393fn attach_session(backend_session_id: &str) -> Result<()> {
394 let status = build_attach_command(backend_session_id).status()?;
395 if !status.success() {
396 anyhow::bail!("attach failed with {status}");
397 }
398 Ok(())
399}
400
401#[cfg(any(test, coverage))]
403#[allow(clippy::unnecessary_wraps, clippy::missing_const_for_fn)]
404fn attach_session(_backend_session_id: &str) -> Result<()> {
405 Ok(())
406}
407
408fn api_error(text: &str) -> anyhow::Error {
410 serde_json::from_str::<serde_json::Value>(text)
411 .ok()
412 .and_then(|v| v["error"].as_str().map(String::from))
413 .map_or_else(|| anyhow::anyhow!("{text}"), |msg| anyhow::anyhow!("{msg}"))
414}
415
416async fn ok_or_api_error(resp: reqwest::Response) -> Result<String> {
418 if resp.status().is_success() {
419 Ok(resp.text().await?)
420 } else {
421 let text = resp.text().await?;
422 Err(api_error(&text))
423 }
424}
425
426fn friendly_error(err: &reqwest::Error, node: &str) -> anyhow::Error {
428 if err.is_connect() {
429 anyhow::anyhow!(
430 "Could not connect to pulpod at {node}. Is the daemon running?\nStart it with: brew services start pulpo"
431 )
432 } else {
433 anyhow::anyhow!("Network error connecting to {node}: {err}")
434 }
435}
436
437fn is_localhost(node: &str) -> bool {
439 let host = node.split(':').next().unwrap_or(node);
440 host == "localhost" || host == "127.0.0.1" || node.starts_with("[::1]") || node == "::1"
441}
442
443async fn discover_token(client: &reqwest::Client, base: &str) -> Option<String> {
445 let resp = client
446 .get(format!("{base}/api/v1/auth/token"))
447 .send()
448 .await
449 .ok()?;
450 let body: AuthTokenResponse = resp.json().await.ok()?;
451 if body.token.is_empty() {
452 None
453 } else {
454 Some(body.token)
455 }
456}
457
458async fn resolve_token(
460 client: &reqwest::Client,
461 base: &str,
462 node: &str,
463 explicit: Option<&str>,
464) -> Option<String> {
465 if let Some(t) = explicit {
466 return Some(t.to_owned());
467 }
468 if is_localhost(node) {
469 return discover_token(client, base).await;
470 }
471 None
472}
473
474fn authed_get(
476 client: &reqwest::Client,
477 url: String,
478 token: Option<&str>,
479) -> reqwest::RequestBuilder {
480 let req = client.get(url);
481 if let Some(t) = token {
482 req.bearer_auth(t)
483 } else {
484 req
485 }
486}
487
488fn authed_post(
490 client: &reqwest::Client,
491 url: String,
492 token: Option<&str>,
493) -> reqwest::RequestBuilder {
494 let req = client.post(url);
495 if let Some(t) = token {
496 req.bearer_auth(t)
497 } else {
498 req
499 }
500}
501
502fn authed_delete(
504 client: &reqwest::Client,
505 url: String,
506 token: Option<&str>,
507) -> reqwest::RequestBuilder {
508 let req = client.delete(url);
509 if let Some(t) = token {
510 req.bearer_auth(t)
511 } else {
512 req
513 }
514}
515
516async fn fetch_output(
518 client: &reqwest::Client,
519 base: &str,
520 name: &str,
521 lines: usize,
522 token: Option<&str>,
523) -> Result<String> {
524 let resp = authed_get(
525 client,
526 format!("{base}/api/v1/sessions/{name}/output?lines={lines}"),
527 token,
528 )
529 .send()
530 .await?;
531 let text = ok_or_api_error(resp).await?;
532 let output: OutputResponse = serde_json::from_str(&text)?;
533 Ok(output.output)
534}
535
536async fn fetch_session_status(
538 client: &reqwest::Client,
539 base: &str,
540 name: &str,
541 token: Option<&str>,
542) -> Result<String> {
543 let resp = authed_get(client, format!("{base}/api/v1/sessions/{name}"), token)
544 .send()
545 .await?;
546 let text = ok_or_api_error(resp).await?;
547 let session: Session = serde_json::from_str(&text)?;
548 Ok(session.status.to_string())
549}
550
551fn diff_output<'a>(prev: &str, new: &'a str) -> &'a str {
558 if prev.is_empty() {
559 return new;
560 }
561
562 let prev_lines: Vec<&str> = prev.lines().collect();
563 let new_lines: Vec<&str> = new.lines().collect();
564
565 if new_lines.is_empty() {
566 return "";
567 }
568
569 let last_prev = prev_lines[prev_lines.len() - 1];
571
572 for i in (0..new_lines.len()).rev() {
574 if new_lines[i] == last_prev {
575 let overlap_len = prev_lines.len().min(i + 1);
577 let prev_tail = &prev_lines[prev_lines.len() - overlap_len..];
578 let new_overlap = &new_lines[i + 1 - overlap_len..=i];
579 if prev_tail == new_overlap {
580 if i + 1 < new_lines.len() {
581 let consumed: usize = new_lines[..=i].iter().map(|l| l.len() + 1).sum();
583 return new.get(consumed.min(new.len())..).unwrap_or("");
584 }
585 return "";
586 }
587 }
588 }
589
590 new
592}
593
594async fn follow_logs(
596 client: &reqwest::Client,
597 base: &str,
598 name: &str,
599 lines: usize,
600 token: Option<&str>,
601 writer: &mut (dyn std::io::Write + Send),
602) -> Result<()> {
603 let mut prev_output = fetch_output(client, base, name, lines, token).await?;
604 write!(writer, "{prev_output}")?;
605
606 loop {
607 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
608
609 let status = fetch_session_status(client, base, name, token).await?;
611 let is_terminal = status == "completed" || status == "dead" || status == "stale";
612
613 let new_output = fetch_output(client, base, name, lines, token).await?;
615
616 let diff = diff_output(&prev_output, &new_output);
617 if !diff.is_empty() {
618 write!(writer, "{diff}")?;
619 }
620 prev_output = new_output;
621
622 if is_terminal {
623 break;
624 }
625 }
626 Ok(())
627}
628
629#[cfg_attr(coverage, allow(dead_code))]
632const CRONTAB_TAG: &str = "#pulpo:";
633
634#[cfg(not(coverage))]
636fn read_crontab() -> Result<String> {
637 let output = std::process::Command::new("crontab").arg("-l").output()?;
638 if output.status.success() {
639 Ok(String::from_utf8_lossy(&output.stdout).to_string())
640 } else {
641 Ok(String::new())
642 }
643}
644
645#[cfg(not(coverage))]
647fn write_crontab(content: &str) -> Result<()> {
648 use std::io::Write;
649 let mut child = std::process::Command::new("crontab")
650 .arg("-")
651 .stdin(std::process::Stdio::piped())
652 .spawn()?;
653 child
654 .stdin
655 .as_mut()
656 .unwrap()
657 .write_all(content.as_bytes())?;
658 let status = child.wait()?;
659 if !status.success() {
660 anyhow::bail!("crontab write failed");
661 }
662 Ok(())
663}
664
665#[cfg_attr(coverage, allow(dead_code))]
667fn build_crontab_line(
668 name: &str,
669 cron: &str,
670 workdir: &str,
671 provider: &str,
672 prompt: &str,
673 node: &str,
674) -> String {
675 format!(
676 "{cron} pulpo --node {node} spawn --workdir {workdir} --provider {provider} --auto {prompt} {CRONTAB_TAG}{name}\n"
677 )
678}
679
680#[cfg_attr(coverage, allow(dead_code))]
682fn crontab_install(crontab: &str, name: &str, line: &str) -> Result<String> {
683 let tag = format!("{CRONTAB_TAG}{name}");
684 if crontab.contains(&tag) {
685 anyhow::bail!("schedule \"{name}\" already exists — remove it first");
686 }
687 let mut result = crontab.to_owned();
688 result.push_str(line);
689 Ok(result)
690}
691
692#[cfg_attr(coverage, allow(dead_code))]
694fn crontab_list(crontab: &str) -> String {
695 let entries: Vec<&str> = crontab
696 .lines()
697 .filter(|l| l.contains(CRONTAB_TAG))
698 .collect();
699 if entries.is_empty() {
700 return "No pulpo schedules.".into();
701 }
702 let mut lines = vec![format!("{:<20} {:<15} {}", "NAME", "CRON", "PAUSED")];
703 for entry in entries {
704 let paused = entry.starts_with('#');
705 let raw = entry.trim_start_matches('#').trim();
706 let name = raw.rsplit_once(CRONTAB_TAG).map_or("?", |(_, n)| n);
707 let parts: Vec<&str> = raw.splitn(6, ' ').collect();
708 let cron_expr = if parts.len() >= 5 {
709 parts[..5].join(" ")
710 } else {
711 "?".into()
712 };
713 lines.push(format!(
714 "{:<20} {:<15} {}",
715 name,
716 cron_expr,
717 if paused { "yes" } else { "no" }
718 ));
719 }
720 lines.join("\n")
721}
722
723#[cfg_attr(coverage, allow(dead_code))]
725fn crontab_remove(crontab: &str, name: &str) -> Result<String> {
726 use std::fmt::Write;
727 let tag = format!("{CRONTAB_TAG}{name}");
728 let filtered =
729 crontab
730 .lines()
731 .filter(|l| !l.contains(&tag))
732 .fold(String::new(), |mut acc, l| {
733 writeln!(acc, "{l}").unwrap();
734 acc
735 });
736 if filtered.len() == crontab.len() {
737 anyhow::bail!("schedule \"{name}\" not found");
738 }
739 Ok(filtered)
740}
741
742#[cfg_attr(coverage, allow(dead_code))]
744fn crontab_pause(crontab: &str, name: &str) -> Result<String> {
745 use std::fmt::Write;
746 let tag = format!("{CRONTAB_TAG}{name}");
747 let mut found = false;
748 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
749 if l.contains(&tag) && !l.starts_with('#') {
750 found = true;
751 writeln!(acc, "#{l}").unwrap();
752 } else {
753 writeln!(acc, "{l}").unwrap();
754 }
755 acc
756 });
757 if !found {
758 anyhow::bail!("schedule \"{name}\" not found or already paused");
759 }
760 Ok(updated)
761}
762
763#[cfg_attr(coverage, allow(dead_code))]
765fn crontab_resume(crontab: &str, name: &str) -> Result<String> {
766 use std::fmt::Write;
767 let tag = format!("{CRONTAB_TAG}{name}");
768 let mut found = false;
769 let updated = crontab.lines().fold(String::new(), |mut acc, l| {
770 if l.contains(&tag) && l.starts_with('#') {
771 found = true;
772 writeln!(acc, "{}", l.trim_start_matches('#')).unwrap();
773 } else {
774 writeln!(acc, "{l}").unwrap();
775 }
776 acc
777 });
778 if !found {
779 anyhow::bail!("schedule \"{name}\" not found or not paused");
780 }
781 Ok(updated)
782}
783
784#[cfg(not(coverage))]
786fn execute_schedule(action: &ScheduleAction, node: &str) -> Result<String> {
787 match action {
788 ScheduleAction::Install {
789 name,
790 cron,
791 workdir,
792 provider,
793 prompt,
794 } => {
795 let crontab = read_crontab()?;
796 let joined_prompt = prompt.join(" ");
797 let line = build_crontab_line(name, cron, workdir, provider, &joined_prompt, node);
798 let updated = crontab_install(&crontab, name, &line)?;
799 write_crontab(&updated)?;
800 Ok(format!("Installed schedule \"{name}\""))
801 }
802 ScheduleAction::List => {
803 let crontab = read_crontab()?;
804 Ok(crontab_list(&crontab))
805 }
806 ScheduleAction::Remove { name } => {
807 let crontab = read_crontab()?;
808 let updated = crontab_remove(&crontab, name)?;
809 write_crontab(&updated)?;
810 Ok(format!("Removed schedule \"{name}\""))
811 }
812 ScheduleAction::Pause { name } => {
813 let crontab = read_crontab()?;
814 let updated = crontab_pause(&crontab, name)?;
815 write_crontab(&updated)?;
816 Ok(format!("Paused schedule \"{name}\""))
817 }
818 ScheduleAction::Resume { name } => {
819 let crontab = read_crontab()?;
820 let updated = crontab_resume(&crontab, name)?;
821 write_crontab(&updated)?;
822 Ok(format!("Resumed schedule \"{name}\""))
823 }
824 }
825}
826
827#[cfg(coverage)]
829fn execute_schedule(_action: &ScheduleAction, _node: &str) -> Result<String> {
830 Ok(String::new())
831}
832
833#[allow(clippy::too_many_lines)]
835pub async fn execute(cli: &Cli) -> Result<String> {
836 let url = base_url(&cli.node);
837 let client = reqwest::Client::new();
838 let node = &cli.node;
839 let token = resolve_token(&client, &url, node, cli.token.as_deref()).await;
840
841 match &cli.command {
842 Commands::Attach { name } => {
843 let resp = authed_get(
845 &client,
846 format!("{url}/api/v1/sessions/{name}"),
847 token.as_deref(),
848 )
849 .send()
850 .await
851 .map_err(|e| friendly_error(&e, node))?;
852 let text = ok_or_api_error(resp).await?;
853 let session: Session = serde_json::from_str(&text)?;
854 let backend_id = session.backend_session_id.unwrap_or_else(|| name.clone());
855 attach_session(&backend_id)?;
856 Ok(format!("Detached from session {name}."))
857 }
858 Commands::Input { name, text } => {
859 let input_text = text.as_deref().unwrap_or("\n");
860 let body = serde_json::json!({ "text": input_text });
861 let resp = authed_post(
862 &client,
863 format!("{url}/api/v1/sessions/{name}/input"),
864 token.as_deref(),
865 )
866 .json(&body)
867 .send()
868 .await
869 .map_err(|e| friendly_error(&e, node))?;
870 ok_or_api_error(resp).await?;
871 Ok(format!("Sent input to session {name}."))
872 }
873 Commands::List => {
874 let resp = authed_get(&client, format!("{url}/api/v1/sessions"), token.as_deref())
875 .send()
876 .await
877 .map_err(|e| friendly_error(&e, node))?;
878 let text = ok_or_api_error(resp).await?;
879 let sessions: Vec<Session> = serde_json::from_str(&text)?;
880 Ok(format_sessions(&sessions))
881 }
882 Commands::Nodes => {
883 let resp = authed_get(&client, format!("{url}/api/v1/peers"), token.as_deref())
884 .send()
885 .await
886 .map_err(|e| friendly_error(&e, node))?;
887 let text = ok_or_api_error(resp).await?;
888 let resp: PeersResponse = serde_json::from_str(&text)?;
889 Ok(format_nodes(&resp))
890 }
891 Commands::Spawn {
892 workdir,
893 name,
894 provider,
895 auto,
896 unrestricted,
897 model,
898 system_prompt,
899 allowed_tools,
900 ink,
901 max_turns,
902 max_budget,
903 output_format,
904 prompt,
905 } => {
906 let prompt_text = prompt.join(" ");
907 let mode = if *auto { "autonomous" } else { "interactive" };
908 let resolved_workdir = workdir.clone().unwrap_or_else(|| {
910 std::env::current_dir()
911 .map_or_else(|_| ".".into(), |p| p.to_string_lossy().into_owned())
912 });
913 let mut body = serde_json::json!({
914 "workdir": resolved_workdir,
915 "mode": mode,
916 });
917 if !prompt_text.is_empty() {
919 body["prompt"] = serde_json::json!(prompt_text);
920 }
921 if let Some(p) = provider {
923 body["provider"] = serde_json::json!(p);
924 }
925 if *unrestricted {
926 body["unrestricted"] = serde_json::json!(true);
927 }
928 if let Some(n) = name {
929 body["name"] = serde_json::json!(n);
930 }
931 if let Some(m) = model {
932 body["model"] = serde_json::json!(m);
933 }
934 if let Some(sp) = system_prompt {
935 body["system_prompt"] = serde_json::json!(sp);
936 }
937 if let Some(tools) = allowed_tools {
938 body["allowed_tools"] = serde_json::json!(tools);
939 }
940 if let Some(p) = ink {
941 body["ink"] = serde_json::json!(p);
942 }
943 if let Some(mt) = max_turns {
944 body["max_turns"] = serde_json::json!(mt);
945 }
946 if let Some(mb) = max_budget {
947 body["max_budget_usd"] = serde_json::json!(mb);
948 }
949 if let Some(of) = output_format {
950 body["output_format"] = serde_json::json!(of);
951 }
952 let resp = authed_post(&client, format!("{url}/api/v1/sessions"), token.as_deref())
953 .json(&body)
954 .send()
955 .await
956 .map_err(|e| friendly_error(&e, node))?;
957 let text = ok_or_api_error(resp).await?;
958 let resp: CreateSessionResponse = serde_json::from_str(&text)?;
959 let mut msg = format!(
960 "Created session \"{}\" ({})",
961 resp.session.name, resp.session.id
962 );
963 for w in &resp.warnings {
964 use std::fmt::Write;
965 let _ = write!(msg, "\n Warning: {w}");
966 }
967 Ok(msg)
968 }
969 Commands::Kill { name } => {
970 let resp = authed_post(
971 &client,
972 format!("{url}/api/v1/sessions/{name}/kill"),
973 token.as_deref(),
974 )
975 .send()
976 .await
977 .map_err(|e| friendly_error(&e, node))?;
978 ok_or_api_error(resp).await?;
979 Ok(format!("Session {name} killed."))
980 }
981 Commands::Delete { name } => {
982 let resp = authed_delete(
983 &client,
984 format!("{url}/api/v1/sessions/{name}"),
985 token.as_deref(),
986 )
987 .send()
988 .await
989 .map_err(|e| friendly_error(&e, node))?;
990 ok_or_api_error(resp).await?;
991 Ok(format!("Session {name} deleted."))
992 }
993 Commands::Logs {
994 name,
995 lines,
996 follow,
997 } => {
998 if *follow {
999 let mut stdout = std::io::stdout();
1000 follow_logs(&client, &url, name, *lines, token.as_deref(), &mut stdout)
1001 .await
1002 .map_err(|e| {
1003 match e.downcast::<reqwest::Error>() {
1005 Ok(re) => friendly_error(&re, node),
1006 Err(other) => other,
1007 }
1008 })?;
1009 Ok(String::new())
1010 } else {
1011 let output = fetch_output(&client, &url, name, *lines, token.as_deref())
1012 .await
1013 .map_err(|e| match e.downcast::<reqwest::Error>() {
1014 Ok(re) => friendly_error(&re, node),
1015 Err(other) => other,
1016 })?;
1017 Ok(output)
1018 }
1019 }
1020 Commands::Interventions { name } => {
1021 let resp = authed_get(
1022 &client,
1023 format!("{url}/api/v1/sessions/{name}/interventions"),
1024 token.as_deref(),
1025 )
1026 .send()
1027 .await
1028 .map_err(|e| friendly_error(&e, node))?;
1029 let text = ok_or_api_error(resp).await?;
1030 let events: Vec<InterventionEventResponse> = serde_json::from_str(&text)?;
1031 Ok(format_interventions(&events))
1032 }
1033 Commands::Ui => {
1034 let dashboard = base_url(&cli.node);
1035 open_browser(&dashboard)?;
1036 Ok(format!("Opening {dashboard}"))
1037 }
1038 Commands::Resume { name } => {
1039 let resp = authed_post(
1040 &client,
1041 format!("{url}/api/v1/sessions/{name}/resume"),
1042 token.as_deref(),
1043 )
1044 .send()
1045 .await
1046 .map_err(|e| friendly_error(&e, node))?;
1047 let text = ok_or_api_error(resp).await?;
1048 let session: Session = serde_json::from_str(&text)?;
1049 Ok(format!("Resumed session \"{}\"", session.name))
1050 }
1051 Commands::Knowledge {
1052 session,
1053 kind,
1054 repo,
1055 ink,
1056 limit,
1057 context,
1058 get,
1059 delete,
1060 push,
1061 } => {
1062 if let Some(id) = get {
1064 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1065 let resp = authed_get(&client, endpoint, token.as_deref())
1066 .send()
1067 .await
1068 .map_err(|e| friendly_error(&e, node))?;
1069 let text = ok_or_api_error(resp).await?;
1070 let resp: KnowledgeItemResponse = serde_json::from_str(&text)?;
1071 return Ok(format_knowledge(&[resp.knowledge]));
1072 }
1073
1074 if let Some(id) = delete {
1076 let endpoint = format!("{url}/api/v1/knowledge/{id}");
1077 let resp = authed_delete(&client, endpoint, token.as_deref())
1078 .send()
1079 .await
1080 .map_err(|e| friendly_error(&e, node))?;
1081 let text = ok_or_api_error(resp).await?;
1082 let resp: KnowledgeDeleteResponse = serde_json::from_str(&text)?;
1083 return Ok(if resp.deleted {
1084 format!("Deleted knowledge item {id}")
1085 } else {
1086 format!("Knowledge item {id} not found")
1087 });
1088 }
1089
1090 if *push {
1092 let endpoint = format!("{url}/api/v1/knowledge/push");
1093 let resp = authed_post(&client, endpoint, token.as_deref())
1094 .send()
1095 .await
1096 .map_err(|e| friendly_error(&e, node))?;
1097 let text = ok_or_api_error(resp).await?;
1098 let resp: KnowledgePushResponse = serde_json::from_str(&text)?;
1099 return Ok(resp.message);
1100 }
1101
1102 let mut params = vec![format!("limit={limit}")];
1104 let endpoint = if *context {
1105 if let Some(r) = repo {
1106 params.push(format!("workdir={r}"));
1107 }
1108 if let Some(i) = ink {
1109 params.push(format!("ink={i}"));
1110 }
1111 format!("{url}/api/v1/knowledge/context?{}", params.join("&"))
1112 } else {
1113 if let Some(s) = session {
1114 params.push(format!("session_id={s}"));
1115 }
1116 if let Some(k) = kind {
1117 params.push(format!("kind={k}"));
1118 }
1119 if let Some(r) = repo {
1120 params.push(format!("repo={r}"));
1121 }
1122 if let Some(i) = ink {
1123 params.push(format!("ink={i}"));
1124 }
1125 format!("{url}/api/v1/knowledge?{}", params.join("&"))
1126 };
1127 let resp = authed_get(&client, endpoint, token.as_deref())
1128 .send()
1129 .await
1130 .map_err(|e| friendly_error(&e, node))?;
1131 let text = ok_or_api_error(resp).await?;
1132 let resp: KnowledgeResponse = serde_json::from_str(&text)?;
1133 Ok(format_knowledge(&resp.knowledge))
1134 }
1135 Commands::Schedule { action } => execute_schedule(action, node),
1136 }
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141 use super::*;
1142
1143 #[test]
1144 fn test_base_url() {
1145 assert_eq!(base_url("localhost:7433"), "http://localhost:7433");
1146 assert_eq!(base_url("my-machine:9999"), "http://my-machine:9999");
1147 }
1148
1149 #[test]
1150 fn test_cli_parse_list() {
1151 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1152 assert_eq!(cli.node, "localhost:7433");
1153 assert!(matches!(cli.command, Commands::List));
1154 }
1155
1156 #[test]
1157 fn test_cli_parse_nodes() {
1158 let cli = Cli::try_parse_from(["pulpo", "nodes"]).unwrap();
1159 assert!(matches!(cli.command, Commands::Nodes));
1160 }
1161
1162 #[test]
1163 fn test_cli_parse_ui() {
1164 let cli = Cli::try_parse_from(["pulpo", "ui"]).unwrap();
1165 assert!(matches!(cli.command, Commands::Ui));
1166 }
1167
1168 #[test]
1169 fn test_cli_parse_ui_custom_node() {
1170 let cli = Cli::try_parse_from(["pulpo", "--node", "mac-mini:7433", "ui"]).unwrap();
1171 assert!(matches!(cli.command, Commands::Ui));
1172 assert_eq!(cli.node, "mac-mini:7433");
1173 }
1174
1175 #[test]
1176 fn test_build_open_command() {
1177 let cmd = build_open_command("http://localhost:7433");
1178 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1179 assert_eq!(args, vec!["http://localhost:7433"]);
1180 #[cfg(target_os = "macos")]
1181 assert_eq!(cmd.get_program(), "open");
1182 #[cfg(target_os = "linux")]
1183 assert_eq!(cmd.get_program(), "xdg-open");
1184 }
1185
1186 #[test]
1187 fn test_cli_parse_spawn() {
1188 let cli = Cli::try_parse_from([
1189 "pulpo",
1190 "spawn",
1191 "--workdir",
1192 "/tmp/repo",
1193 "Fix",
1194 "the",
1195 "bug",
1196 ])
1197 .unwrap();
1198 assert!(matches!(
1199 &cli.command,
1200 Commands::Spawn { workdir, provider, auto, unrestricted, prompt, .. }
1201 if workdir.as_deref() == Some("/tmp/repo") && provider.is_none() && !auto
1202 && !unrestricted && prompt == &["Fix", "the", "bug"]
1203 ));
1204 }
1205
1206 #[test]
1207 fn test_cli_parse_spawn_with_provider() {
1208 let cli = Cli::try_parse_from([
1209 "pulpo",
1210 "spawn",
1211 "--workdir",
1212 "/tmp",
1213 "--provider",
1214 "codex",
1215 "Do it",
1216 ])
1217 .unwrap();
1218 assert!(matches!(
1219 &cli.command,
1220 Commands::Spawn { provider, .. } if provider.as_deref() == Some("codex")
1221 ));
1222 }
1223
1224 #[test]
1225 fn test_cli_parse_spawn_auto() {
1226 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "--auto", "Do it"])
1227 .unwrap();
1228 assert!(matches!(
1229 &cli.command,
1230 Commands::Spawn { auto, .. } if *auto
1231 ));
1232 }
1233
1234 #[test]
1235 fn test_cli_parse_spawn_unrestricted() {
1236 let cli = Cli::try_parse_from([
1237 "pulpo",
1238 "spawn",
1239 "--workdir",
1240 "/tmp",
1241 "--unrestricted",
1242 "Do it",
1243 ])
1244 .unwrap();
1245 assert!(matches!(
1246 &cli.command,
1247 Commands::Spawn { unrestricted, .. } if *unrestricted
1248 ));
1249 }
1250
1251 #[test]
1252 fn test_cli_parse_spawn_unrestricted_default() {
1253 let cli = Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp", "Do it"]).unwrap();
1254 assert!(matches!(
1255 &cli.command,
1256 Commands::Spawn { unrestricted, .. } if !unrestricted
1257 ));
1258 }
1259
1260 #[test]
1261 fn test_cli_parse_spawn_with_name() {
1262 let cli = Cli::try_parse_from([
1263 "pulpo",
1264 "spawn",
1265 "--workdir",
1266 "/tmp/repo",
1267 "--name",
1268 "my-task",
1269 "Fix it",
1270 ])
1271 .unwrap();
1272 assert!(matches!(
1273 &cli.command,
1274 Commands::Spawn { workdir, name, .. }
1275 if workdir.as_deref() == Some("/tmp/repo") && name.as_deref() == Some("my-task")
1276 ));
1277 }
1278
1279 #[test]
1280 fn test_cli_parse_spawn_without_name() {
1281 let cli =
1282 Cli::try_parse_from(["pulpo", "spawn", "--workdir", "/tmp/repo", "Fix it"]).unwrap();
1283 assert!(matches!(
1284 &cli.command,
1285 Commands::Spawn { name, .. } if name.is_none()
1286 ));
1287 }
1288
1289 #[test]
1290 fn test_cli_parse_logs() {
1291 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session"]).unwrap();
1292 assert!(matches!(
1293 &cli.command,
1294 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 100 && !follow
1295 ));
1296 }
1297
1298 #[test]
1299 fn test_cli_parse_logs_with_lines() {
1300 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--lines", "50"]).unwrap();
1301 assert!(matches!(
1302 &cli.command,
1303 Commands::Logs { name, lines, follow } if name == "my-session" && *lines == 50 && !follow
1304 ));
1305 }
1306
1307 #[test]
1308 fn test_cli_parse_logs_follow() {
1309 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "--follow"]).unwrap();
1310 assert!(matches!(
1311 &cli.command,
1312 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1313 ));
1314 }
1315
1316 #[test]
1317 fn test_cli_parse_logs_follow_short() {
1318 let cli = Cli::try_parse_from(["pulpo", "logs", "my-session", "-f"]).unwrap();
1319 assert!(matches!(
1320 &cli.command,
1321 Commands::Logs { name, follow, .. } if name == "my-session" && *follow
1322 ));
1323 }
1324
1325 #[test]
1326 fn test_cli_parse_kill() {
1327 let cli = Cli::try_parse_from(["pulpo", "kill", "my-session"]).unwrap();
1328 assert!(matches!(
1329 &cli.command,
1330 Commands::Kill { name } if name == "my-session"
1331 ));
1332 }
1333
1334 #[test]
1335 fn test_cli_parse_delete() {
1336 let cli = Cli::try_parse_from(["pulpo", "delete", "my-session"]).unwrap();
1337 assert!(matches!(
1338 &cli.command,
1339 Commands::Delete { name } if name == "my-session"
1340 ));
1341 }
1342
1343 #[test]
1344 fn test_cli_parse_resume() {
1345 let cli = Cli::try_parse_from(["pulpo", "resume", "my-session"]).unwrap();
1346 assert!(matches!(
1347 &cli.command,
1348 Commands::Resume { name } if name == "my-session"
1349 ));
1350 }
1351
1352 #[test]
1353 fn test_cli_parse_input() {
1354 let cli = Cli::try_parse_from(["pulpo", "input", "my-session", "yes"]).unwrap();
1355 assert!(matches!(
1356 &cli.command,
1357 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("yes")
1358 ));
1359 }
1360
1361 #[test]
1362 fn test_cli_parse_input_no_text() {
1363 let cli = Cli::try_parse_from(["pulpo", "input", "my-session"]).unwrap();
1364 assert!(matches!(
1365 &cli.command,
1366 Commands::Input { name, text } if name == "my-session" && text.is_none()
1367 ));
1368 }
1369
1370 #[test]
1371 fn test_cli_parse_input_alias() {
1372 let cli = Cli::try_parse_from(["pulpo", "i", "my-session", "y"]).unwrap();
1373 assert!(matches!(
1374 &cli.command,
1375 Commands::Input { name, text } if name == "my-session" && text.as_deref() == Some("y")
1376 ));
1377 }
1378
1379 #[test]
1380 fn test_cli_parse_custom_node() {
1381 let cli = Cli::try_parse_from(["pulpo", "--node", "win-pc:8080", "list"]).unwrap();
1382 assert_eq!(cli.node, "win-pc:8080");
1383 }
1384
1385 #[test]
1386 fn test_cli_version() {
1387 let result = Cli::try_parse_from(["pulpo", "--version"]);
1388 let err = result.unwrap_err();
1390 assert_eq!(err.kind(), clap::error::ErrorKind::DisplayVersion);
1391 }
1392
1393 #[test]
1394 fn test_cli_parse_no_subcommand_fails() {
1395 let result = Cli::try_parse_from(["pulpo"]);
1396 assert!(result.is_err());
1397 }
1398
1399 #[test]
1400 fn test_cli_debug() {
1401 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
1402 let debug = format!("{cli:?}");
1403 assert!(debug.contains("List"));
1404 }
1405
1406 #[test]
1407 fn test_commands_debug() {
1408 let cmd = Commands::List;
1409 assert_eq!(format!("{cmd:?}"), "List");
1410 }
1411
1412 const TEST_SESSION_JSON: &str = r#"{"id":"00000000-0000-0000-0000-000000000001","name":"repo","workdir":"/tmp/repo","provider":"claude","prompt":"Fix bug","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
1414
1415 fn test_create_response_json() -> String {
1417 format!(r#"{{"session":{TEST_SESSION_JSON}}}"#)
1418 }
1419
1420 async fn start_test_server() -> String {
1422 use axum::http::StatusCode;
1423 use axum::{
1424 Json, Router,
1425 routing::{get, post},
1426 };
1427
1428 let create_json = test_create_response_json();
1429
1430 let app = Router::new()
1431 .route(
1432 "/api/v1/sessions",
1433 get(|| async { Json::<Vec<()>>(vec![]) }).post(move || async move {
1434 (StatusCode::CREATED, create_json.clone())
1435 }),
1436 )
1437 .route(
1438 "/api/v1/sessions/{id}",
1439 get(|| async { TEST_SESSION_JSON.to_owned() })
1440 .delete(|| async { StatusCode::NO_CONTENT }),
1441 )
1442 .route(
1443 "/api/v1/sessions/{id}/kill",
1444 post(|| async { StatusCode::NO_CONTENT }),
1445 )
1446 .route(
1447 "/api/v1/sessions/{id}/output",
1448 get(|| async { r#"{"output":"test output"}"#.to_owned() }),
1449 )
1450 .route(
1451 "/api/v1/peers",
1452 get(|| async {
1453 r#"{"local":{"name":"test","hostname":"h","os":"macos","arch":"arm64","cpus":8,"memory_mb":0,"gpu":null},"peers":[]}"#.to_owned()
1454 }),
1455 )
1456 .route(
1457 "/api/v1/sessions/{id}/resume",
1458 axum::routing::post(|| async { TEST_SESSION_JSON.to_owned() }),
1459 )
1460 .route(
1461 "/api/v1/sessions/{id}/interventions",
1462 get(|| async { "[]".to_owned() }),
1463 )
1464 .route(
1465 "/api/v1/sessions/{id}/input",
1466 post(|| async { StatusCode::NO_CONTENT }),
1467 );
1468
1469 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1470 let addr = listener.local_addr().unwrap();
1471 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1472 format!("127.0.0.1:{}", addr.port())
1473 }
1474
1475 #[tokio::test]
1476 async fn test_execute_list_success() {
1477 let node = start_test_server().await;
1478 let cli = Cli {
1479 node,
1480 token: None,
1481 command: Commands::List,
1482 };
1483 let result = execute(&cli).await.unwrap();
1484 assert_eq!(result, "No sessions.");
1485 }
1486
1487 #[tokio::test]
1488 async fn test_execute_nodes_success() {
1489 let node = start_test_server().await;
1490 let cli = Cli {
1491 node,
1492 token: None,
1493 command: Commands::Nodes,
1494 };
1495 let result = execute(&cli).await.unwrap();
1496 assert!(result.contains("test"));
1497 assert!(result.contains("(local)"));
1498 assert!(result.contains("NAME"));
1499 }
1500
1501 #[tokio::test]
1502 async fn test_execute_spawn_success() {
1503 let node = start_test_server().await;
1504 let cli = Cli {
1505 node,
1506 token: None,
1507 command: Commands::Spawn {
1508 workdir: Some("/tmp/repo".into()),
1509 name: None,
1510 provider: Some("claude".into()),
1511 auto: false,
1512 unrestricted: false,
1513 model: None,
1514 system_prompt: None,
1515 allowed_tools: None,
1516 ink: None,
1517 max_turns: None,
1518 max_budget: None,
1519 output_format: None,
1520 prompt: vec!["Fix".into(), "bug".into()],
1521 },
1522 };
1523 let result = execute(&cli).await.unwrap();
1524 assert!(result.contains("Created session"));
1525 assert!(result.contains("repo"));
1526 }
1527
1528 #[tokio::test]
1529 async fn test_execute_spawn_with_all_new_flags() {
1530 let node = start_test_server().await;
1531 let cli = Cli {
1532 node,
1533 token: None,
1534 command: Commands::Spawn {
1535 workdir: Some("/tmp/repo".into()),
1536 name: None,
1537 provider: Some("claude".into()),
1538 auto: false,
1539 unrestricted: false,
1540 model: Some("opus".into()),
1541 system_prompt: Some("Be helpful".into()),
1542 allowed_tools: Some(vec!["Read".into(), "Write".into()]),
1543 ink: Some("coder".into()),
1544 max_turns: Some(5),
1545 max_budget: Some(2.5),
1546 output_format: Some("json".into()),
1547 prompt: vec!["Fix".into(), "bug".into()],
1548 },
1549 };
1550 let result = execute(&cli).await.unwrap();
1551 assert!(result.contains("Created session"));
1552 }
1553
1554 #[tokio::test]
1555 async fn test_execute_spawn_auto_mode() {
1556 let node = start_test_server().await;
1557 let cli = Cli {
1558 node,
1559 token: None,
1560 command: Commands::Spawn {
1561 workdir: Some("/tmp/repo".into()),
1562 name: None,
1563 provider: Some("claude".into()),
1564 auto: true,
1565 unrestricted: false,
1566 model: None,
1567 system_prompt: None,
1568 allowed_tools: None,
1569 ink: None,
1570 max_turns: None,
1571 max_budget: None,
1572 output_format: None,
1573 prompt: vec!["Do it".into()],
1574 },
1575 };
1576 let result = execute(&cli).await.unwrap();
1577 assert!(result.contains("Created session"));
1578 }
1579
1580 #[tokio::test]
1581 async fn test_execute_spawn_with_name() {
1582 let node = start_test_server().await;
1583 let cli = Cli {
1584 node,
1585 token: None,
1586 command: Commands::Spawn {
1587 workdir: Some("/tmp/repo".into()),
1588 name: Some("my-task".into()),
1589 provider: Some("claude".into()),
1590 auto: false,
1591 unrestricted: false,
1592 model: None,
1593 system_prompt: None,
1594 allowed_tools: None,
1595 ink: None,
1596 max_turns: None,
1597 max_budget: None,
1598 output_format: None,
1599 prompt: vec!["Fix".into(), "bug".into()],
1600 },
1601 };
1602 let result = execute(&cli).await.unwrap();
1603 assert!(result.contains("Created session"));
1604 }
1605
1606 #[tokio::test]
1607 async fn test_execute_kill_success() {
1608 let node = start_test_server().await;
1609 let cli = Cli {
1610 node,
1611 token: None,
1612 command: Commands::Kill {
1613 name: "test-session".into(),
1614 },
1615 };
1616 let result = execute(&cli).await.unwrap();
1617 assert!(result.contains("killed"));
1618 }
1619
1620 #[tokio::test]
1621 async fn test_execute_delete_success() {
1622 let node = start_test_server().await;
1623 let cli = Cli {
1624 node,
1625 token: None,
1626 command: Commands::Delete {
1627 name: "test-session".into(),
1628 },
1629 };
1630 let result = execute(&cli).await.unwrap();
1631 assert!(result.contains("deleted"));
1632 }
1633
1634 #[tokio::test]
1635 async fn test_execute_logs_success() {
1636 let node = start_test_server().await;
1637 let cli = Cli {
1638 node,
1639 token: None,
1640 command: Commands::Logs {
1641 name: "test-session".into(),
1642 lines: 50,
1643 follow: false,
1644 },
1645 };
1646 let result = execute(&cli).await.unwrap();
1647 assert!(result.contains("test output"));
1648 }
1649
1650 #[tokio::test]
1651 async fn test_execute_list_connection_refused() {
1652 let cli = Cli {
1653 node: "localhost:1".into(),
1654 token: None,
1655 command: Commands::List,
1656 };
1657 let result = execute(&cli).await;
1658 let err = result.unwrap_err().to_string();
1659 assert!(
1660 err.contains("Could not connect to pulpod"),
1661 "Expected friendly error, got: {err}"
1662 );
1663 assert!(err.contains("localhost:1"));
1664 }
1665
1666 #[tokio::test]
1667 async fn test_execute_nodes_connection_refused() {
1668 let cli = Cli {
1669 node: "localhost:1".into(),
1670 token: None,
1671 command: Commands::Nodes,
1672 };
1673 let result = execute(&cli).await;
1674 let err = result.unwrap_err().to_string();
1675 assert!(err.contains("Could not connect to pulpod"));
1676 }
1677
1678 #[tokio::test]
1679 async fn test_execute_kill_error_response() {
1680 use axum::{Router, http::StatusCode, routing::post};
1681
1682 let app = Router::new().route(
1683 "/api/v1/sessions/{id}/kill",
1684 post(|| async {
1685 (
1686 StatusCode::NOT_FOUND,
1687 "{\"error\":\"session not found: test-session\"}",
1688 )
1689 }),
1690 );
1691 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1692 let addr = listener.local_addr().unwrap();
1693 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1694 let node = format!("127.0.0.1:{}", addr.port());
1695
1696 let cli = Cli {
1697 node,
1698 token: None,
1699 command: Commands::Kill {
1700 name: "test-session".into(),
1701 },
1702 };
1703 let err = execute(&cli).await.unwrap_err();
1704 assert_eq!(err.to_string(), "session not found: test-session");
1705 }
1706
1707 #[tokio::test]
1708 async fn test_execute_delete_error_response() {
1709 use axum::{Router, http::StatusCode, routing::delete};
1710
1711 let app = Router::new().route(
1712 "/api/v1/sessions/{id}",
1713 delete(|| async {
1714 (
1715 StatusCode::CONFLICT,
1716 "{\"error\":\"cannot delete session in 'running' state\"}",
1717 )
1718 }),
1719 );
1720 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1721 let addr = listener.local_addr().unwrap();
1722 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1723 let node = format!("127.0.0.1:{}", addr.port());
1724
1725 let cli = Cli {
1726 node,
1727 token: None,
1728 command: Commands::Delete {
1729 name: "test-session".into(),
1730 },
1731 };
1732 let err = execute(&cli).await.unwrap_err();
1733 assert_eq!(err.to_string(), "cannot delete session in 'running' state");
1734 }
1735
1736 #[tokio::test]
1737 async fn test_execute_logs_error_response() {
1738 use axum::{Router, http::StatusCode, routing::get};
1739
1740 let app = Router::new().route(
1741 "/api/v1/sessions/{id}/output",
1742 get(|| async {
1743 (
1744 StatusCode::NOT_FOUND,
1745 "{\"error\":\"session not found: ghost\"}",
1746 )
1747 }),
1748 );
1749 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1750 let addr = listener.local_addr().unwrap();
1751 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1752 let node = format!("127.0.0.1:{}", addr.port());
1753
1754 let cli = Cli {
1755 node,
1756 token: None,
1757 command: Commands::Logs {
1758 name: "ghost".into(),
1759 lines: 50,
1760 follow: false,
1761 },
1762 };
1763 let err = execute(&cli).await.unwrap_err();
1764 assert_eq!(err.to_string(), "session not found: ghost");
1765 }
1766
1767 #[tokio::test]
1768 async fn test_execute_resume_error_response() {
1769 use axum::{Router, http::StatusCode, routing::post};
1770
1771 let app = Router::new().route(
1772 "/api/v1/sessions/{id}/resume",
1773 post(|| async {
1774 (
1775 StatusCode::BAD_REQUEST,
1776 "{\"error\":\"session is not stale (status: running)\"}",
1777 )
1778 }),
1779 );
1780 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1781 let addr = listener.local_addr().unwrap();
1782 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1783 let node = format!("127.0.0.1:{}", addr.port());
1784
1785 let cli = Cli {
1786 node,
1787 token: None,
1788 command: Commands::Resume {
1789 name: "test-session".into(),
1790 },
1791 };
1792 let err = execute(&cli).await.unwrap_err();
1793 assert_eq!(err.to_string(), "session is not stale (status: running)");
1794 }
1795
1796 #[tokio::test]
1797 async fn test_execute_spawn_error_response() {
1798 use axum::{Router, http::StatusCode, routing::post};
1799
1800 let app = Router::new().route(
1801 "/api/v1/sessions",
1802 post(|| async {
1803 (
1804 StatusCode::INTERNAL_SERVER_ERROR,
1805 "{\"error\":\"failed to spawn session\"}",
1806 )
1807 }),
1808 );
1809 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1810 let addr = listener.local_addr().unwrap();
1811 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1812 let node = format!("127.0.0.1:{}", addr.port());
1813
1814 let cli = Cli {
1815 node,
1816 token: None,
1817 command: Commands::Spawn {
1818 workdir: Some("/tmp/repo".into()),
1819 name: None,
1820 provider: Some("claude".into()),
1821 auto: false,
1822 unrestricted: false,
1823 model: None,
1824 system_prompt: None,
1825 allowed_tools: None,
1826 ink: None,
1827 max_turns: None,
1828 max_budget: None,
1829 output_format: None,
1830 prompt: vec!["test".into()],
1831 },
1832 };
1833 let err = execute(&cli).await.unwrap_err();
1834 assert_eq!(err.to_string(), "failed to spawn session");
1835 }
1836
1837 #[tokio::test]
1838 async fn test_execute_interventions_error_response() {
1839 use axum::{Router, http::StatusCode, routing::get};
1840
1841 let app = Router::new().route(
1842 "/api/v1/sessions/{id}/interventions",
1843 get(|| async {
1844 (
1845 StatusCode::NOT_FOUND,
1846 "{\"error\":\"session not found: ghost\"}",
1847 )
1848 }),
1849 );
1850 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1851 let addr = listener.local_addr().unwrap();
1852 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1853 let node = format!("127.0.0.1:{}", addr.port());
1854
1855 let cli = Cli {
1856 node,
1857 token: None,
1858 command: Commands::Interventions {
1859 name: "ghost".into(),
1860 },
1861 };
1862 let err = execute(&cli).await.unwrap_err();
1863 assert_eq!(err.to_string(), "session not found: ghost");
1864 }
1865
1866 #[tokio::test]
1867 async fn test_execute_resume_success() {
1868 let node = start_test_server().await;
1869 let cli = Cli {
1870 node,
1871 token: None,
1872 command: Commands::Resume {
1873 name: "test-session".into(),
1874 },
1875 };
1876 let result = execute(&cli).await.unwrap();
1877 assert!(result.contains("Resumed session"));
1878 assert!(result.contains("repo"));
1879 }
1880
1881 #[tokio::test]
1882 async fn test_execute_input_success() {
1883 let node = start_test_server().await;
1884 let cli = Cli {
1885 node,
1886 token: None,
1887 command: Commands::Input {
1888 name: "test-session".into(),
1889 text: Some("yes".into()),
1890 },
1891 };
1892 let result = execute(&cli).await.unwrap();
1893 assert!(result.contains("Sent input to session test-session"));
1894 }
1895
1896 #[tokio::test]
1897 async fn test_execute_input_no_text() {
1898 let node = start_test_server().await;
1899 let cli = Cli {
1900 node,
1901 token: None,
1902 command: Commands::Input {
1903 name: "test-session".into(),
1904 text: None,
1905 },
1906 };
1907 let result = execute(&cli).await.unwrap();
1908 assert!(result.contains("Sent input to session test-session"));
1909 }
1910
1911 #[tokio::test]
1912 async fn test_execute_input_connection_refused() {
1913 let cli = Cli {
1914 node: "localhost:1".into(),
1915 token: None,
1916 command: Commands::Input {
1917 name: "test".into(),
1918 text: Some("y".into()),
1919 },
1920 };
1921 let result = execute(&cli).await;
1922 let err = result.unwrap_err().to_string();
1923 assert!(err.contains("Could not connect to pulpod"));
1924 }
1925
1926 #[tokio::test]
1927 async fn test_execute_input_error_response() {
1928 use axum::{Router, http::StatusCode, routing::post};
1929
1930 let app = Router::new().route(
1931 "/api/v1/sessions/{id}/input",
1932 post(|| async {
1933 (
1934 StatusCode::NOT_FOUND,
1935 "{\"error\":\"session not found: ghost\"}",
1936 )
1937 }),
1938 );
1939 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1940 let addr = listener.local_addr().unwrap();
1941 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
1942 let node = format!("127.0.0.1:{}", addr.port());
1943
1944 let cli = Cli {
1945 node,
1946 token: None,
1947 command: Commands::Input {
1948 name: "ghost".into(),
1949 text: Some("y".into()),
1950 },
1951 };
1952 let err = execute(&cli).await.unwrap_err();
1953 assert_eq!(err.to_string(), "session not found: ghost");
1954 }
1955
1956 #[tokio::test]
1957 async fn test_execute_ui() {
1958 let cli = Cli {
1959 node: "localhost:7433".into(),
1960 token: None,
1961 command: Commands::Ui,
1962 };
1963 let result = execute(&cli).await.unwrap();
1964 assert!(result.contains("Opening"));
1965 assert!(result.contains("http://localhost:7433"));
1966 }
1967
1968 #[tokio::test]
1969 async fn test_execute_ui_custom_node() {
1970 let cli = Cli {
1971 node: "mac-mini:7433".into(),
1972 token: None,
1973 command: Commands::Ui,
1974 };
1975 let result = execute(&cli).await.unwrap();
1976 assert!(result.contains("http://mac-mini:7433"));
1977 }
1978
1979 #[test]
1980 fn test_format_sessions_empty() {
1981 assert_eq!(format_sessions(&[]), "No sessions.");
1982 }
1983
1984 #[test]
1985 fn test_format_sessions_with_data() {
1986 use chrono::Utc;
1987 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
1988 use uuid::Uuid;
1989
1990 let sessions = vec![Session {
1991 id: Uuid::nil(),
1992 name: "my-api".into(),
1993 workdir: "/tmp/repo".into(),
1994 provider: Provider::Claude,
1995 prompt: "Fix the bug".into(),
1996 status: SessionStatus::Running,
1997 mode: SessionMode::Interactive,
1998 conversation_id: None,
1999 exit_code: None,
2000 backend_session_id: None,
2001 output_snapshot: None,
2002 guard_config: None,
2003 model: None,
2004 allowed_tools: None,
2005 system_prompt: None,
2006 metadata: None,
2007 ink: None,
2008 max_turns: None,
2009 max_budget_usd: None,
2010 output_format: None,
2011 intervention_reason: None,
2012 intervention_at: None,
2013 last_output_at: None,
2014 idle_since: None,
2015 waiting_for_input: false,
2016 created_at: Utc::now(),
2017 updated_at: Utc::now(),
2018 }];
2019 let output = format_sessions(&sessions);
2020 assert!(output.contains("NAME"));
2021 assert!(output.contains("my-api"));
2022 assert!(output.contains("running"));
2023 assert!(output.contains("claude"));
2024 assert!(output.contains("Fix the bug"));
2025 }
2026
2027 #[test]
2028 fn test_format_sessions_long_prompt_truncated() {
2029 use chrono::Utc;
2030 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2031 use uuid::Uuid;
2032
2033 let sessions = vec![Session {
2034 id: Uuid::nil(),
2035 name: "test".into(),
2036 workdir: "/tmp".into(),
2037 provider: Provider::Codex,
2038 prompt: "A very long prompt that exceeds forty characters in total length".into(),
2039 status: SessionStatus::Completed,
2040 mode: SessionMode::Autonomous,
2041 conversation_id: None,
2042 exit_code: None,
2043 backend_session_id: None,
2044 output_snapshot: None,
2045 guard_config: None,
2046 model: None,
2047 allowed_tools: None,
2048 system_prompt: None,
2049 metadata: None,
2050 ink: None,
2051 max_turns: None,
2052 max_budget_usd: None,
2053 output_format: None,
2054 intervention_reason: None,
2055 intervention_at: None,
2056 last_output_at: None,
2057 idle_since: None,
2058 waiting_for_input: false,
2059 created_at: Utc::now(),
2060 updated_at: Utc::now(),
2061 }];
2062 let output = format_sessions(&sessions);
2063 assert!(output.contains("..."));
2064 }
2065
2066 #[test]
2067 fn test_format_sessions_waiting_for_input() {
2068 use chrono::Utc;
2069 use pulpo_common::session::{Provider, SessionMode, SessionStatus};
2070 use uuid::Uuid;
2071
2072 let sessions = vec![Session {
2073 id: Uuid::nil(),
2074 name: "blocked".into(),
2075 workdir: "/tmp".into(),
2076 provider: Provider::Claude,
2077 prompt: "Fix bug".into(),
2078 status: SessionStatus::Running,
2079 mode: SessionMode::Interactive,
2080 conversation_id: None,
2081 exit_code: None,
2082 backend_session_id: None,
2083 output_snapshot: None,
2084 guard_config: None,
2085 model: None,
2086 allowed_tools: None,
2087 system_prompt: None,
2088 metadata: None,
2089 ink: None,
2090 max_turns: None,
2091 max_budget_usd: None,
2092 output_format: None,
2093 intervention_reason: None,
2094 intervention_at: None,
2095 last_output_at: None,
2096 idle_since: None,
2097 waiting_for_input: true,
2098 created_at: Utc::now(),
2099 updated_at: Utc::now(),
2100 }];
2101 let output = format_sessions(&sessions);
2102 assert!(output.contains("waiting"));
2103 assert!(!output.contains("running"));
2104 }
2105
2106 #[test]
2107 fn test_format_nodes() {
2108 use pulpo_common::node::NodeInfo;
2109 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2110
2111 let resp = PeersResponse {
2112 local: NodeInfo {
2113 name: "mac-mini".into(),
2114 hostname: "h".into(),
2115 os: "macos".into(),
2116 arch: "arm64".into(),
2117 cpus: 8,
2118 memory_mb: 16384,
2119 gpu: None,
2120 },
2121 peers: vec![PeerInfo {
2122 name: "win-pc".into(),
2123 address: "win-pc:7433".into(),
2124 status: PeerStatus::Online,
2125 node_info: None,
2126 session_count: Some(3),
2127 source: PeerSource::Configured,
2128 }],
2129 };
2130 let output = format_nodes(&resp);
2131 assert!(output.contains("mac-mini"));
2132 assert!(output.contains("(local)"));
2133 assert!(output.contains("win-pc"));
2134 assert!(output.contains('3'));
2135 }
2136
2137 #[test]
2138 fn test_format_nodes_no_session_count() {
2139 use pulpo_common::node::NodeInfo;
2140 use pulpo_common::peer::{PeerInfo, PeerSource, PeerStatus};
2141
2142 let resp = PeersResponse {
2143 local: NodeInfo {
2144 name: "local".into(),
2145 hostname: "h".into(),
2146 os: "linux".into(),
2147 arch: "x86_64".into(),
2148 cpus: 4,
2149 memory_mb: 8192,
2150 gpu: None,
2151 },
2152 peers: vec![PeerInfo {
2153 name: "peer".into(),
2154 address: "peer:7433".into(),
2155 status: PeerStatus::Offline,
2156 node_info: None,
2157 session_count: None,
2158 source: PeerSource::Configured,
2159 }],
2160 };
2161 let output = format_nodes(&resp);
2162 assert!(output.contains("offline"));
2163 let lines: Vec<&str> = output.lines().collect();
2165 assert!(lines[2].contains('-'));
2166 }
2167
2168 #[tokio::test]
2169 async fn test_execute_resume_connection_refused() {
2170 let cli = Cli {
2171 node: "localhost:1".into(),
2172 token: None,
2173 command: Commands::Resume {
2174 name: "test".into(),
2175 },
2176 };
2177 let result = execute(&cli).await;
2178 let err = result.unwrap_err().to_string();
2179 assert!(err.contains("Could not connect to pulpod"));
2180 }
2181
2182 #[tokio::test]
2183 async fn test_execute_spawn_connection_refused() {
2184 let cli = Cli {
2185 node: "localhost:1".into(),
2186 token: None,
2187 command: Commands::Spawn {
2188 workdir: Some("/tmp".into()),
2189 name: None,
2190 provider: Some("claude".into()),
2191 auto: false,
2192 unrestricted: false,
2193 model: None,
2194 system_prompt: None,
2195 allowed_tools: None,
2196 ink: None,
2197 max_turns: None,
2198 max_budget: None,
2199 output_format: None,
2200 prompt: vec!["test".into()],
2201 },
2202 };
2203 let result = execute(&cli).await;
2204 let err = result.unwrap_err().to_string();
2205 assert!(err.contains("Could not connect to pulpod"));
2206 }
2207
2208 #[tokio::test]
2209 async fn test_execute_kill_connection_refused() {
2210 let cli = Cli {
2211 node: "localhost:1".into(),
2212 token: None,
2213 command: Commands::Kill {
2214 name: "test".into(),
2215 },
2216 };
2217 let result = execute(&cli).await;
2218 let err = result.unwrap_err().to_string();
2219 assert!(err.contains("Could not connect to pulpod"));
2220 }
2221
2222 #[tokio::test]
2223 async fn test_execute_delete_connection_refused() {
2224 let cli = Cli {
2225 node: "localhost:1".into(),
2226 token: None,
2227 command: Commands::Delete {
2228 name: "test".into(),
2229 },
2230 };
2231 let result = execute(&cli).await;
2232 let err = result.unwrap_err().to_string();
2233 assert!(err.contains("Could not connect to pulpod"));
2234 }
2235
2236 #[tokio::test]
2237 async fn test_execute_logs_connection_refused() {
2238 let cli = Cli {
2239 node: "localhost:1".into(),
2240 token: None,
2241 command: Commands::Logs {
2242 name: "test".into(),
2243 lines: 50,
2244 follow: false,
2245 },
2246 };
2247 let result = execute(&cli).await;
2248 let err = result.unwrap_err().to_string();
2249 assert!(err.contains("Could not connect to pulpod"));
2250 }
2251
2252 #[tokio::test]
2253 async fn test_friendly_error_connect() {
2254 let err = reqwest::Client::new()
2256 .get("http://127.0.0.1:1")
2257 .send()
2258 .await
2259 .unwrap_err();
2260 let friendly = friendly_error(&err, "test-node:1");
2261 let msg = friendly.to_string();
2262 assert!(
2263 msg.contains("Could not connect"),
2264 "Expected connect message, got: {msg}"
2265 );
2266 }
2267
2268 #[tokio::test]
2269 async fn test_friendly_error_other() {
2270 let err = reqwest::Client::new()
2272 .get("http://[::invalid::url")
2273 .send()
2274 .await
2275 .unwrap_err();
2276 let friendly = friendly_error(&err, "bad-host");
2277 let msg = friendly.to_string();
2278 assert!(
2279 msg.contains("Network error"),
2280 "Expected network error message, got: {msg}"
2281 );
2282 assert!(msg.contains("bad-host"));
2283 }
2284
2285 #[test]
2288 fn test_is_localhost_variants() {
2289 assert!(is_localhost("localhost:7433"));
2290 assert!(is_localhost("127.0.0.1:7433"));
2291 assert!(is_localhost("[::1]:7433"));
2292 assert!(is_localhost("::1"));
2293 assert!(is_localhost("localhost"));
2294 assert!(!is_localhost("mac-mini:7433"));
2295 assert!(!is_localhost("192.168.1.100:7433"));
2296 }
2297
2298 #[test]
2299 fn test_authed_get_with_token() {
2300 let client = reqwest::Client::new();
2301 let req = authed_get(&client, "http://h:1/api".into(), Some("tok"))
2302 .build()
2303 .unwrap();
2304 let auth = req
2305 .headers()
2306 .get("authorization")
2307 .unwrap()
2308 .to_str()
2309 .unwrap();
2310 assert_eq!(auth, "Bearer tok");
2311 }
2312
2313 #[test]
2314 fn test_authed_get_without_token() {
2315 let client = reqwest::Client::new();
2316 let req = authed_get(&client, "http://h:1/api".into(), None)
2317 .build()
2318 .unwrap();
2319 assert!(req.headers().get("authorization").is_none());
2320 }
2321
2322 #[test]
2323 fn test_authed_post_with_token() {
2324 let client = reqwest::Client::new();
2325 let req = authed_post(&client, "http://h:1/api".into(), Some("secret"))
2326 .build()
2327 .unwrap();
2328 let auth = req
2329 .headers()
2330 .get("authorization")
2331 .unwrap()
2332 .to_str()
2333 .unwrap();
2334 assert_eq!(auth, "Bearer secret");
2335 }
2336
2337 #[test]
2338 fn test_authed_post_without_token() {
2339 let client = reqwest::Client::new();
2340 let req = authed_post(&client, "http://h:1/api".into(), None)
2341 .build()
2342 .unwrap();
2343 assert!(req.headers().get("authorization").is_none());
2344 }
2345
2346 #[test]
2347 fn test_authed_delete_with_token() {
2348 let client = reqwest::Client::new();
2349 let req = authed_delete(&client, "http://h:1/api".into(), Some("del-tok"))
2350 .build()
2351 .unwrap();
2352 let auth = req
2353 .headers()
2354 .get("authorization")
2355 .unwrap()
2356 .to_str()
2357 .unwrap();
2358 assert_eq!(auth, "Bearer del-tok");
2359 }
2360
2361 #[test]
2362 fn test_authed_delete_without_token() {
2363 let client = reqwest::Client::new();
2364 let req = authed_delete(&client, "http://h:1/api".into(), None)
2365 .build()
2366 .unwrap();
2367 assert!(req.headers().get("authorization").is_none());
2368 }
2369
2370 #[tokio::test]
2371 async fn test_resolve_token_explicit() {
2372 let client = reqwest::Client::new();
2373 let token =
2374 resolve_token(&client, "http://localhost:1", "localhost:1", Some("my-tok")).await;
2375 assert_eq!(token, Some("my-tok".into()));
2376 }
2377
2378 #[tokio::test]
2379 async fn test_resolve_token_remote_no_explicit() {
2380 let client = reqwest::Client::new();
2381 let token = resolve_token(&client, "http://remote:7433", "remote:7433", None).await;
2382 assert_eq!(token, None);
2383 }
2384
2385 #[tokio::test]
2386 async fn test_resolve_token_localhost_auto_discover() {
2387 use axum::{Json, Router, routing::get};
2388
2389 let app = Router::new().route(
2390 "/api/v1/auth/token",
2391 get(|| async {
2392 Json(AuthTokenResponse {
2393 token: "discovered".into(),
2394 })
2395 }),
2396 );
2397 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2398 let addr = listener.local_addr().unwrap();
2399 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2400
2401 let node = format!("localhost:{}", addr.port());
2402 let base = base_url(&node);
2403 let client = reqwest::Client::new();
2404 let token = resolve_token(&client, &base, &node, None).await;
2405 assert_eq!(token, Some("discovered".into()));
2406 }
2407
2408 #[tokio::test]
2409 async fn test_discover_token_empty_returns_none() {
2410 use axum::{Json, Router, routing::get};
2411
2412 let app = Router::new().route(
2413 "/api/v1/auth/token",
2414 get(|| async {
2415 Json(AuthTokenResponse {
2416 token: String::new(),
2417 })
2418 }),
2419 );
2420 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2421 let addr = listener.local_addr().unwrap();
2422 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2423
2424 let base = format!("http://127.0.0.1:{}", addr.port());
2425 let client = reqwest::Client::new();
2426 assert_eq!(discover_token(&client, &base).await, None);
2427 }
2428
2429 #[tokio::test]
2430 async fn test_discover_token_unreachable_returns_none() {
2431 let client = reqwest::Client::new();
2432 assert_eq!(discover_token(&client, "http://127.0.0.1:1").await, None);
2433 }
2434
2435 #[test]
2436 fn test_cli_parse_with_token() {
2437 let cli = Cli::try_parse_from(["pulpo", "--token", "my-secret", "list"]).unwrap();
2438 assert_eq!(cli.token, Some("my-secret".into()));
2439 }
2440
2441 #[test]
2442 fn test_cli_parse_without_token() {
2443 let cli = Cli::try_parse_from(["pulpo", "list"]).unwrap();
2444 assert_eq!(cli.token, None);
2445 }
2446
2447 #[tokio::test]
2448 async fn test_execute_with_explicit_token_sends_header() {
2449 use axum::{Router, extract::Request, http::StatusCode, routing::get};
2450
2451 let app = Router::new().route(
2452 "/api/v1/sessions",
2453 get(|req: Request| async move {
2454 let auth = req
2455 .headers()
2456 .get("authorization")
2457 .and_then(|v| v.to_str().ok())
2458 .unwrap_or("");
2459 assert_eq!(auth, "Bearer test-token");
2460 (StatusCode::OK, "[]".to_owned())
2461 }),
2462 );
2463 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2464 let addr = listener.local_addr().unwrap();
2465 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2466 let node = format!("127.0.0.1:{}", addr.port());
2467
2468 let cli = Cli {
2469 node,
2470 token: Some("test-token".into()),
2471 command: Commands::List,
2472 };
2473 let result = execute(&cli).await.unwrap();
2474 assert_eq!(result, "No sessions.");
2475 }
2476
2477 #[test]
2480 fn test_cli_parse_interventions() {
2481 let cli = Cli::try_parse_from(["pulpo", "interventions", "my-session"]).unwrap();
2482 assert!(matches!(
2483 &cli.command,
2484 Commands::Interventions { name } if name == "my-session"
2485 ));
2486 }
2487
2488 #[test]
2489 fn test_format_interventions_empty() {
2490 assert_eq!(format_interventions(&[]), "No intervention events.");
2491 }
2492
2493 #[test]
2494 fn test_format_interventions_with_data() {
2495 let events = vec![
2496 InterventionEventResponse {
2497 id: 1,
2498 session_id: "sess-1".into(),
2499 reason: "Memory exceeded threshold".into(),
2500 created_at: "2026-01-01T00:00:00Z".into(),
2501 },
2502 InterventionEventResponse {
2503 id: 2,
2504 session_id: "sess-1".into(),
2505 reason: "Idle for 10 minutes".into(),
2506 created_at: "2026-01-02T00:00:00Z".into(),
2507 },
2508 ];
2509 let output = format_interventions(&events);
2510 assert!(output.contains("ID"));
2511 assert!(output.contains("TIMESTAMP"));
2512 assert!(output.contains("REASON"));
2513 assert!(output.contains("Memory exceeded threshold"));
2514 assert!(output.contains("Idle for 10 minutes"));
2515 assert!(output.contains("2026-01-01T00:00:00Z"));
2516 }
2517
2518 #[tokio::test]
2519 async fn test_execute_interventions_empty() {
2520 let node = start_test_server().await;
2521 let cli = Cli {
2522 node,
2523 token: None,
2524 command: Commands::Interventions {
2525 name: "my-session".into(),
2526 },
2527 };
2528 let result = execute(&cli).await.unwrap();
2529 assert_eq!(result, "No intervention events.");
2530 }
2531
2532 #[tokio::test]
2533 async fn test_execute_interventions_with_data() {
2534 use axum::{Router, routing::get};
2535
2536 let app = Router::new().route(
2537 "/api/v1/sessions/{id}/interventions",
2538 get(|| async {
2539 r#"[{"id":1,"session_id":"s","reason":"OOM","created_at":"2026-01-01T00:00:00Z"}]"#
2540 .to_owned()
2541 }),
2542 );
2543 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2544 let addr = listener.local_addr().unwrap();
2545 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2546 let node = format!("127.0.0.1:{}", addr.port());
2547
2548 let cli = Cli {
2549 node,
2550 token: None,
2551 command: Commands::Interventions {
2552 name: "test".into(),
2553 },
2554 };
2555 let result = execute(&cli).await.unwrap();
2556 assert!(result.contains("OOM"));
2557 assert!(result.contains("2026-01-01T00:00:00Z"));
2558 }
2559
2560 #[tokio::test]
2561 async fn test_execute_interventions_connection_refused() {
2562 let cli = Cli {
2563 node: "localhost:1".into(),
2564 token: None,
2565 command: Commands::Interventions {
2566 name: "test".into(),
2567 },
2568 };
2569 let result = execute(&cli).await;
2570 let err = result.unwrap_err().to_string();
2571 assert!(err.contains("Could not connect to pulpod"));
2572 }
2573
2574 #[test]
2577 fn test_build_attach_command() {
2578 let cmd = build_attach_command("my-session");
2579 assert_eq!(cmd.get_program(), "tmux");
2580 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2581 assert_eq!(args, vec!["attach-session", "-t", "my-session"]);
2582 }
2583
2584 #[test]
2585 fn test_cli_parse_attach() {
2586 let cli = Cli::try_parse_from(["pulpo", "attach", "my-session"]).unwrap();
2587 assert!(matches!(
2588 &cli.command,
2589 Commands::Attach { name } if name == "my-session"
2590 ));
2591 }
2592
2593 #[test]
2594 fn test_cli_parse_attach_alias() {
2595 let cli = Cli::try_parse_from(["pulpo", "a", "my-session"]).unwrap();
2596 assert!(matches!(
2597 &cli.command,
2598 Commands::Attach { name } if name == "my-session"
2599 ));
2600 }
2601
2602 #[tokio::test]
2603 async fn test_execute_attach_success() {
2604 let node = start_test_server().await;
2605 let cli = Cli {
2606 node,
2607 token: None,
2608 command: Commands::Attach {
2609 name: "test-session".into(),
2610 },
2611 };
2612 let result = execute(&cli).await.unwrap();
2613 assert!(result.contains("Detached from session test-session"));
2614 }
2615
2616 #[tokio::test]
2617 async fn test_execute_attach_with_backend_session_id() {
2618 use axum::{Router, routing::get};
2619 let session_json = r#"{"id":"00000000-0000-0000-0000-000000000002","name":"my-session","workdir":"/tmp","provider":"claude","prompt":"test","status":"running","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":"my-session","output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#;
2620 let app = Router::new().route(
2621 "/api/v1/sessions/{id}",
2622 get(move || async move { session_json.to_owned() }),
2623 );
2624 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2625 let addr = listener.local_addr().unwrap();
2626 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2627
2628 let cli = Cli {
2629 node: format!("127.0.0.1:{}", addr.port()),
2630 token: None,
2631 command: Commands::Attach {
2632 name: "my-session".into(),
2633 },
2634 };
2635 let result = execute(&cli).await.unwrap();
2636 assert!(result.contains("Detached from session my-session"));
2637 }
2638
2639 #[tokio::test]
2640 async fn test_execute_attach_connection_refused() {
2641 let cli = Cli {
2642 node: "localhost:1".into(),
2643 token: None,
2644 command: Commands::Attach {
2645 name: "test-session".into(),
2646 },
2647 };
2648 let result = execute(&cli).await;
2649 let err = result.unwrap_err().to_string();
2650 assert!(err.contains("Could not connect to pulpod"));
2651 }
2652
2653 #[tokio::test]
2654 async fn test_execute_attach_error_response() {
2655 use axum::{Router, http::StatusCode, routing::get};
2656 let app = Router::new().route(
2657 "/api/v1/sessions/{id}",
2658 get(|| async {
2659 (
2660 StatusCode::NOT_FOUND,
2661 r#"{"error":"session not found"}"#.to_owned(),
2662 )
2663 }),
2664 );
2665 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2666 let addr = listener.local_addr().unwrap();
2667 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2668
2669 let cli = Cli {
2670 node: format!("127.0.0.1:{}", addr.port()),
2671 token: None,
2672 command: Commands::Attach {
2673 name: "nonexistent".into(),
2674 },
2675 };
2676 let result = execute(&cli).await;
2677 let err = result.unwrap_err().to_string();
2678 assert!(err.contains("session not found"));
2679 }
2680
2681 #[test]
2684 fn test_cli_parse_alias_spawn() {
2685 let cli = Cli::try_parse_from(["pulpo", "s", "--workdir", "/tmp", "Do it"]).unwrap();
2686 assert!(matches!(&cli.command, Commands::Spawn { .. }));
2687 }
2688
2689 #[test]
2690 fn test_cli_parse_alias_list() {
2691 let cli = Cli::try_parse_from(["pulpo", "ls"]).unwrap();
2692 assert!(matches!(cli.command, Commands::List));
2693 }
2694
2695 #[test]
2696 fn test_cli_parse_alias_logs() {
2697 let cli = Cli::try_parse_from(["pulpo", "l", "my-session"]).unwrap();
2698 assert!(matches!(
2699 &cli.command,
2700 Commands::Logs { name, .. } if name == "my-session"
2701 ));
2702 }
2703
2704 #[test]
2705 fn test_cli_parse_alias_kill() {
2706 let cli = Cli::try_parse_from(["pulpo", "k", "my-session"]).unwrap();
2707 assert!(matches!(
2708 &cli.command,
2709 Commands::Kill { name } if name == "my-session"
2710 ));
2711 }
2712
2713 #[test]
2714 fn test_cli_parse_alias_delete() {
2715 let cli = Cli::try_parse_from(["pulpo", "rm", "my-session"]).unwrap();
2716 assert!(matches!(
2717 &cli.command,
2718 Commands::Delete { name } if name == "my-session"
2719 ));
2720 }
2721
2722 #[test]
2723 fn test_cli_parse_alias_resume() {
2724 let cli = Cli::try_parse_from(["pulpo", "r", "my-session"]).unwrap();
2725 assert!(matches!(
2726 &cli.command,
2727 Commands::Resume { name } if name == "my-session"
2728 ));
2729 }
2730
2731 #[test]
2732 fn test_cli_parse_alias_nodes() {
2733 let cli = Cli::try_parse_from(["pulpo", "n"]).unwrap();
2734 assert!(matches!(cli.command, Commands::Nodes));
2735 }
2736
2737 #[test]
2738 fn test_cli_parse_alias_interventions() {
2739 let cli = Cli::try_parse_from(["pulpo", "iv", "my-session"]).unwrap();
2740 assert!(matches!(
2741 &cli.command,
2742 Commands::Interventions { name } if name == "my-session"
2743 ));
2744 }
2745
2746 #[test]
2747 fn test_api_error_json() {
2748 let err = api_error("{\"error\":\"session not found: foo\"}");
2749 assert_eq!(err.to_string(), "session not found: foo");
2750 }
2751
2752 #[test]
2753 fn test_api_error_plain_text() {
2754 let err = api_error("plain text error");
2755 assert_eq!(err.to_string(), "plain text error");
2756 }
2757
2758 #[test]
2761 fn test_diff_output_empty_prev() {
2762 assert_eq!(diff_output("", "line1\nline2\n"), "line1\nline2\n");
2763 }
2764
2765 #[test]
2766 fn test_diff_output_identical() {
2767 assert_eq!(diff_output("line1\nline2", "line1\nline2"), "");
2768 }
2769
2770 #[test]
2771 fn test_diff_output_new_lines_appended() {
2772 let prev = "line1\nline2";
2773 let new = "line1\nline2\nline3\nline4";
2774 assert_eq!(diff_output(prev, new), "line3\nline4");
2775 }
2776
2777 #[test]
2778 fn test_diff_output_scrolled_window() {
2779 let prev = "line1\nline2\nline3";
2781 let new = "line2\nline3\nline4";
2782 assert_eq!(diff_output(prev, new), "line4");
2783 }
2784
2785 #[test]
2786 fn test_diff_output_completely_different() {
2787 let prev = "aaa\nbbb";
2788 let new = "xxx\nyyy";
2789 assert_eq!(diff_output(prev, new), "xxx\nyyy");
2790 }
2791
2792 #[test]
2793 fn test_diff_output_last_line_matches_but_overlap_fails() {
2794 let prev = "aaa\ncommon";
2796 let new = "zzz\ncommon\nnew_line";
2797 assert_eq!(diff_output(prev, new), "zzz\ncommon\nnew_line");
2801 }
2802
2803 #[test]
2804 fn test_diff_output_new_empty() {
2805 assert_eq!(diff_output("line1", ""), "");
2806 }
2807
2808 async fn start_follow_test_server() -> String {
2812 use axum::{Router, extract::Path, extract::Query, routing::get};
2813 use std::sync::Arc;
2814 use std::sync::atomic::{AtomicUsize, Ordering};
2815
2816 let call_count = Arc::new(AtomicUsize::new(0));
2817 let output_count = call_count.clone();
2818 let status_count = Arc::new(AtomicUsize::new(0));
2819 let status_count_inner = status_count.clone();
2820
2821 let app = Router::new()
2822 .route(
2823 "/api/v1/sessions/{id}/output",
2824 get(
2825 move |_path: Path<String>,
2826 _query: Query<std::collections::HashMap<String, String>>| {
2827 let count = output_count.clone();
2828 async move {
2829 let n = count.fetch_add(1, Ordering::SeqCst);
2830 let output = match n {
2831 0 => "line1\nline2".to_owned(),
2832 1 => "line1\nline2\nline3".to_owned(),
2833 _ => "line2\nline3\nline4".to_owned(),
2834 };
2835 format!(r#"{{"output":{}}}"#, serde_json::json!(output))
2836 }
2837 },
2838 ),
2839 )
2840 .route(
2841 "/api/v1/sessions/{id}",
2842 get(move |_path: Path<String>| {
2843 let count = status_count_inner.clone();
2844 async move {
2845 let n = count.fetch_add(1, Ordering::SeqCst);
2846 let status = if n < 2 { "running" } else { "completed" };
2847 format!(
2848 r#"{{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"{status}","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}}"#
2849 )
2850 }
2851 }),
2852 );
2853
2854 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2855 let addr = listener.local_addr().unwrap();
2856 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2857 format!("http://127.0.0.1:{}", addr.port())
2858 }
2859
2860 #[tokio::test]
2861 async fn test_follow_logs_polls_and_exits_on_completed() {
2862 let base = start_follow_test_server().await;
2863 let client = reqwest::Client::new();
2864 let mut buf = Vec::new();
2865
2866 follow_logs(&client, &base, "test", 100, None, &mut buf)
2867 .await
2868 .unwrap();
2869
2870 let output = String::from_utf8(buf).unwrap();
2871 assert!(output.contains("line1"));
2873 assert!(output.contains("line2"));
2874 assert!(output.contains("line3"));
2875 assert!(output.contains("line4"));
2876 }
2877
2878 #[tokio::test]
2879 async fn test_execute_logs_follow_success() {
2880 let base = start_follow_test_server().await;
2881 let node = base.strip_prefix("http://").unwrap().to_owned();
2883
2884 let cli = Cli {
2885 node,
2886 token: None,
2887 command: Commands::Logs {
2888 name: "test".into(),
2889 lines: 100,
2890 follow: true,
2891 },
2892 };
2893 let result = execute(&cli).await.unwrap();
2895 assert_eq!(result, "");
2896 }
2897
2898 #[tokio::test]
2899 async fn test_execute_logs_follow_connection_refused() {
2900 let cli = Cli {
2901 node: "localhost:1".into(),
2902 token: None,
2903 command: Commands::Logs {
2904 name: "test".into(),
2905 lines: 50,
2906 follow: true,
2907 },
2908 };
2909 let result = execute(&cli).await;
2910 let err = result.unwrap_err().to_string();
2911 assert!(
2912 err.contains("Could not connect to pulpod"),
2913 "Expected friendly error, got: {err}"
2914 );
2915 }
2916
2917 #[tokio::test]
2918 async fn test_follow_logs_exits_on_dead() {
2919 use axum::{Router, extract::Path, extract::Query, routing::get};
2920
2921 let app = Router::new()
2922 .route(
2923 "/api/v1/sessions/{id}/output",
2924 get(
2925 |_path: Path<String>,
2926 _query: Query<std::collections::HashMap<String, String>>| async {
2927 r#"{"output":"some output"}"#.to_owned()
2928 },
2929 ),
2930 )
2931 .route(
2932 "/api/v1/sessions/{id}",
2933 get(|_path: Path<String>| async {
2934 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"dead","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2935 }),
2936 );
2937
2938 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2939 let addr = listener.local_addr().unwrap();
2940 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2941 let base = format!("http://127.0.0.1:{}", addr.port());
2942
2943 let client = reqwest::Client::new();
2944 let mut buf = Vec::new();
2945 follow_logs(&client, &base, "test", 100, None, &mut buf)
2946 .await
2947 .unwrap();
2948
2949 let output = String::from_utf8(buf).unwrap();
2950 assert!(output.contains("some output"));
2951 }
2952
2953 #[tokio::test]
2954 async fn test_follow_logs_exits_on_stale() {
2955 use axum::{Router, extract::Path, extract::Query, routing::get};
2956
2957 let app = Router::new()
2958 .route(
2959 "/api/v1/sessions/{id}/output",
2960 get(
2961 |_path: Path<String>,
2962 _query: Query<std::collections::HashMap<String, String>>| async {
2963 r#"{"output":"stale output"}"#.to_owned()
2964 },
2965 ),
2966 )
2967 .route(
2968 "/api/v1/sessions/{id}",
2969 get(|_path: Path<String>| async {
2970 r#"{"id":"00000000-0000-0000-0000-000000000001","name":"test","workdir":"/tmp","provider":"claude","prompt":"test","status":"stale","mode":"interactive","conversation_id":null,"exit_code":null,"backend_session_id":null,"output_snapshot":null,"guard_config":null,"intervention_reason":null,"intervention_at":null,"last_output_at":null,"waiting_for_input":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}"#.to_owned()
2971 }),
2972 );
2973
2974 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2975 let addr = listener.local_addr().unwrap();
2976 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
2977 let base = format!("http://127.0.0.1:{}", addr.port());
2978
2979 let client = reqwest::Client::new();
2980 let mut buf = Vec::new();
2981 follow_logs(&client, &base, "test", 100, None, &mut buf)
2982 .await
2983 .unwrap();
2984
2985 let output = String::from_utf8(buf).unwrap();
2986 assert!(output.contains("stale output"));
2987 }
2988
2989 #[tokio::test]
2990 async fn test_execute_logs_follow_non_reqwest_error() {
2991 use axum::{Router, extract::Path, extract::Query, routing::get};
2992
2993 let app = Router::new()
2995 .route(
2996 "/api/v1/sessions/{id}/output",
2997 get(
2998 |_path: Path<String>,
2999 _query: Query<std::collections::HashMap<String, String>>| async {
3000 r#"{"output":"initial"}"#.to_owned()
3001 },
3002 ),
3003 )
3004 .route(
3005 "/api/v1/sessions/{id}",
3006 get(|_path: Path<String>| async { "not valid json".to_owned() }),
3007 );
3008
3009 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
3010 let addr = listener.local_addr().unwrap();
3011 tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
3012 let node = format!("127.0.0.1:{}", addr.port());
3013
3014 let cli = Cli {
3015 node,
3016 token: None,
3017 command: Commands::Logs {
3018 name: "test".into(),
3019 lines: 100,
3020 follow: true,
3021 },
3022 };
3023 let err = execute(&cli).await.unwrap_err();
3024 let msg = err.to_string();
3026 assert!(
3027 msg.contains("expected ident"),
3028 "Expected serde parse error, got: {msg}"
3029 );
3030 }
3031
3032 #[test]
3033 fn test_cli_parse_spawn_with_guardrails() {
3034 let cli = Cli::try_parse_from([
3035 "pulpo",
3036 "spawn",
3037 "--workdir",
3038 "/tmp",
3039 "--max-turns",
3040 "10",
3041 "--max-budget",
3042 "5.5",
3043 "--output-format",
3044 "json",
3045 "Do it",
3046 ])
3047 .unwrap();
3048 assert!(matches!(
3049 &cli.command,
3050 Commands::Spawn { max_turns, max_budget, output_format, .. }
3051 if *max_turns == Some(10) && *max_budget == Some(5.5)
3052 && output_format.as_deref() == Some("json")
3053 ));
3054 }
3055
3056 #[tokio::test]
3057 async fn test_fetch_session_status_connection_error() {
3058 let client = reqwest::Client::new();
3059 let result = fetch_session_status(&client, "http://127.0.0.1:1", "test", None).await;
3060 assert!(result.is_err());
3061 }
3062
3063 #[test]
3066 fn test_build_crontab_line() {
3067 let line = build_crontab_line(
3068 "nightly-review",
3069 "0 3 * * *",
3070 "/home/me/repo",
3071 "claude",
3072 "Review PRs",
3073 "localhost:7433",
3074 );
3075 assert_eq!(
3076 line,
3077 "0 3 * * * pulpo --node localhost:7433 spawn --workdir /home/me/repo --provider claude --auto Review PRs #pulpo:nightly-review\n"
3078 );
3079 }
3080
3081 #[test]
3082 fn test_crontab_install_success() {
3083 let crontab = "# existing cron\n0 * * * * echo hi\n";
3084 let line = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:my-job\n";
3085 let result = crontab_install(crontab, "my-job", line).unwrap();
3086 assert!(result.starts_with("# existing cron\n"));
3087 assert!(result.ends_with("#pulpo:my-job\n"));
3088 assert!(result.contains("echo hi"));
3089 }
3090
3091 #[test]
3092 fn test_crontab_install_duplicate_error() {
3093 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3094 let line = "0 4 * * * pulpo spawn other #pulpo:my-job\n";
3095 let err = crontab_install(crontab, "my-job", line).unwrap_err();
3096 assert!(err.to_string().contains("already exists"));
3097 }
3098
3099 #[test]
3100 fn test_crontab_list_empty() {
3101 assert_eq!(crontab_list(""), "No pulpo schedules.");
3102 }
3103
3104 #[test]
3105 fn test_crontab_list_no_pulpo_entries() {
3106 assert_eq!(crontab_list("0 * * * * echo hi\n"), "No pulpo schedules.");
3107 }
3108
3109 #[test]
3110 fn test_crontab_list_with_entries() {
3111 let crontab = "0 3 * * * pulpo --node n spawn --workdir /tmp --provider claude --auto task #pulpo:nightly\n";
3112 let output = crontab_list(crontab);
3113 assert!(output.contains("NAME"));
3114 assert!(output.contains("CRON"));
3115 assert!(output.contains("PAUSED"));
3116 assert!(output.contains("nightly"));
3117 assert!(output.contains("0 3 * * *"));
3118 assert!(output.contains("no"));
3119 }
3120
3121 #[test]
3122 fn test_crontab_list_paused_entry() {
3123 let crontab = "#0 3 * * * pulpo spawn task #pulpo:paused-job\n";
3124 let output = crontab_list(crontab);
3125 assert!(output.contains("paused-job"));
3126 assert!(output.contains("yes"));
3127 }
3128
3129 #[test]
3130 fn test_crontab_list_short_line() {
3131 let crontab = "badcron #pulpo:broken\n";
3133 let output = crontab_list(crontab);
3134 assert!(output.contains("broken"));
3135 assert!(output.contains('?'));
3136 }
3137
3138 #[test]
3139 fn test_crontab_remove_success() {
3140 let crontab = "0 * * * * echo hi\n0 3 * * * pulpo spawn task #pulpo:my-job\n";
3141 let result = crontab_remove(crontab, "my-job").unwrap();
3142 assert!(result.contains("echo hi"));
3143 assert!(!result.contains("my-job"));
3144 }
3145
3146 #[test]
3147 fn test_crontab_remove_not_found() {
3148 let crontab = "0 * * * * echo hi\n";
3149 let err = crontab_remove(crontab, "ghost").unwrap_err();
3150 assert!(err.to_string().contains("not found"));
3151 }
3152
3153 #[test]
3154 fn test_crontab_pause_success() {
3155 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3156 let result = crontab_pause(crontab, "my-job").unwrap();
3157 assert!(result.starts_with('#'));
3158 assert!(result.contains("#pulpo:my-job"));
3159 }
3160
3161 #[test]
3162 fn test_crontab_pause_not_found() {
3163 let crontab = "0 * * * * echo hi\n";
3164 let err = crontab_pause(crontab, "ghost").unwrap_err();
3165 assert!(err.to_string().contains("not found or already paused"));
3166 }
3167
3168 #[test]
3169 fn test_crontab_pause_already_paused() {
3170 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3171 let err = crontab_pause(crontab, "my-job").unwrap_err();
3172 assert!(err.to_string().contains("already paused"));
3173 }
3174
3175 #[test]
3176 fn test_crontab_resume_success() {
3177 let crontab = "#0 3 * * * pulpo spawn task #pulpo:my-job\n";
3178 let result = crontab_resume(crontab, "my-job").unwrap();
3179 assert!(!result.starts_with('#'));
3180 assert!(result.contains("#pulpo:my-job"));
3181 }
3182
3183 #[test]
3184 fn test_crontab_resume_not_found() {
3185 let crontab = "0 * * * * echo hi\n";
3186 let err = crontab_resume(crontab, "ghost").unwrap_err();
3187 assert!(err.to_string().contains("not found or not paused"));
3188 }
3189
3190 #[test]
3191 fn test_crontab_resume_not_paused() {
3192 let crontab = "0 3 * * * pulpo spawn task #pulpo:my-job\n";
3193 let err = crontab_resume(crontab, "my-job").unwrap_err();
3194 assert!(err.to_string().contains("not paused"));
3195 }
3196
3197 #[test]
3200 fn test_cli_parse_schedule_install() {
3201 let cli = Cli::try_parse_from([
3202 "pulpo",
3203 "schedule",
3204 "install",
3205 "nightly",
3206 "0 3 * * *",
3207 "--workdir",
3208 "/tmp/repo",
3209 "Review",
3210 "PRs",
3211 ])
3212 .unwrap();
3213 assert!(matches!(
3214 &cli.command,
3215 Commands::Schedule {
3216 action: ScheduleAction::Install { name, cron, workdir, provider, prompt }
3217 } if name == "nightly" && cron == "0 3 * * *" && workdir == "/tmp/repo"
3218 && provider == "claude" && prompt == &["Review", "PRs"]
3219 ));
3220 }
3221
3222 #[test]
3223 fn test_cli_parse_schedule_list() {
3224 let cli = Cli::try_parse_from(["pulpo", "schedule", "list"]).unwrap();
3225 assert!(matches!(
3226 &cli.command,
3227 Commands::Schedule {
3228 action: ScheduleAction::List
3229 }
3230 ));
3231 }
3232
3233 #[test]
3234 fn test_cli_parse_schedule_remove() {
3235 let cli = Cli::try_parse_from(["pulpo", "schedule", "remove", "nightly"]).unwrap();
3236 assert!(matches!(
3237 &cli.command,
3238 Commands::Schedule {
3239 action: ScheduleAction::Remove { name }
3240 } if name == "nightly"
3241 ));
3242 }
3243
3244 #[test]
3245 fn test_cli_parse_schedule_pause() {
3246 let cli = Cli::try_parse_from(["pulpo", "schedule", "pause", "nightly"]).unwrap();
3247 assert!(matches!(
3248 &cli.command,
3249 Commands::Schedule {
3250 action: ScheduleAction::Pause { name }
3251 } if name == "nightly"
3252 ));
3253 }
3254
3255 #[test]
3256 fn test_cli_parse_schedule_resume() {
3257 let cli = Cli::try_parse_from(["pulpo", "schedule", "resume", "nightly"]).unwrap();
3258 assert!(matches!(
3259 &cli.command,
3260 Commands::Schedule {
3261 action: ScheduleAction::Resume { name }
3262 } if name == "nightly"
3263 ));
3264 }
3265
3266 #[test]
3267 fn test_cli_parse_schedule_alias() {
3268 let cli = Cli::try_parse_from(["pulpo", "sched", "list"]).unwrap();
3269 assert!(matches!(
3270 &cli.command,
3271 Commands::Schedule {
3272 action: ScheduleAction::List
3273 }
3274 ));
3275 }
3276
3277 #[test]
3278 fn test_cli_parse_schedule_list_alias() {
3279 let cli = Cli::try_parse_from(["pulpo", "schedule", "ls"]).unwrap();
3280 assert!(matches!(
3281 &cli.command,
3282 Commands::Schedule {
3283 action: ScheduleAction::List
3284 }
3285 ));
3286 }
3287
3288 #[test]
3289 fn test_cli_parse_schedule_remove_alias() {
3290 let cli = Cli::try_parse_from(["pulpo", "schedule", "rm", "nightly"]).unwrap();
3291 assert!(matches!(
3292 &cli.command,
3293 Commands::Schedule {
3294 action: ScheduleAction::Remove { name }
3295 } if name == "nightly"
3296 ));
3297 }
3298
3299 #[test]
3300 fn test_cli_parse_schedule_install_custom_provider() {
3301 let cli = Cli::try_parse_from([
3302 "pulpo",
3303 "schedule",
3304 "install",
3305 "daily",
3306 "0 9 * * *",
3307 "--workdir",
3308 "/tmp",
3309 "--provider",
3310 "codex",
3311 "Run tests",
3312 ])
3313 .unwrap();
3314 assert!(matches!(
3315 &cli.command,
3316 Commands::Schedule {
3317 action: ScheduleAction::Install { provider, .. }
3318 } if provider == "codex"
3319 ));
3320 }
3321
3322 #[tokio::test]
3323 async fn test_execute_schedule_via_execute() {
3324 let node = start_test_server().await;
3326 let cli = Cli {
3327 node,
3328 token: None,
3329 command: Commands::Schedule {
3330 action: ScheduleAction::List,
3331 },
3332 };
3333 let result = execute(&cli).await;
3334 assert!(result.is_ok() || result.is_err());
3336 }
3337
3338 #[test]
3339 fn test_schedule_action_debug() {
3340 let action = ScheduleAction::List;
3341 assert_eq!(format!("{action:?}"), "List");
3342 }
3343
3344 #[test]
3347 fn test_cli_parse_knowledge() {
3348 let cli = Cli::try_parse_from(["pulpo", "knowledge"]).unwrap();
3349 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3350 }
3351
3352 #[test]
3353 fn test_cli_parse_knowledge_alias() {
3354 let cli = Cli::try_parse_from(["pulpo", "kn"]).unwrap();
3355 assert!(matches!(cli.command, Commands::Knowledge { .. }));
3356 }
3357
3358 #[test]
3359 fn test_cli_parse_knowledge_with_filters() {
3360 let cli = Cli::try_parse_from([
3361 "pulpo",
3362 "knowledge",
3363 "--kind",
3364 "failure",
3365 "--repo",
3366 "/tmp/repo",
3367 "--ink",
3368 "coder",
3369 "--limit",
3370 "5",
3371 ])
3372 .unwrap();
3373 match &cli.command {
3374 Commands::Knowledge {
3375 kind,
3376 repo,
3377 ink,
3378 limit,
3379 ..
3380 } => {
3381 assert_eq!(kind.as_deref(), Some("failure"));
3382 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3383 assert_eq!(ink.as_deref(), Some("coder"));
3384 assert_eq!(*limit, 5);
3385 }
3386 _ => panic!("expected Knowledge command"),
3387 }
3388 }
3389
3390 #[test]
3391 fn test_cli_parse_knowledge_context() {
3392 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--context", "--repo", "/tmp/repo"])
3393 .unwrap();
3394 match &cli.command {
3395 Commands::Knowledge { context, repo, .. } => {
3396 assert!(*context);
3397 assert_eq!(repo.as_deref(), Some("/tmp/repo"));
3398 }
3399 _ => panic!("expected Knowledge command"),
3400 }
3401 }
3402
3403 #[test]
3404 fn test_format_knowledge_empty() {
3405 assert_eq!(format_knowledge(&[]), "No knowledge found.");
3406 }
3407
3408 #[test]
3409 fn test_format_knowledge_items() {
3410 use chrono::Utc;
3411 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3412 use uuid::Uuid;
3413
3414 let items = vec![
3415 Knowledge {
3416 id: Uuid::new_v4(),
3417 session_id: Uuid::new_v4(),
3418 kind: KnowledgeKind::Summary,
3419 scope_repo: Some("/tmp/repo".into()),
3420 scope_ink: Some("coder".into()),
3421 title: "Fixed the auth bug".into(),
3422 body: "Details".into(),
3423 tags: vec!["claude".into(), "completed".into()],
3424 relevance: 0.7,
3425 created_at: Utc::now(),
3426 },
3427 Knowledge {
3428 id: Uuid::new_v4(),
3429 session_id: Uuid::new_v4(),
3430 kind: KnowledgeKind::Failure,
3431 scope_repo: None,
3432 scope_ink: None,
3433 title: "OOM crash during build".into(),
3434 body: "Details".into(),
3435 tags: vec!["failure".into()],
3436 relevance: 0.9,
3437 created_at: Utc::now(),
3438 },
3439 ];
3440
3441 let output = format_knowledge(&items);
3442 assert!(output.contains("KIND"));
3443 assert!(output.contains("TITLE"));
3444 assert!(output.contains("summary"));
3445 assert!(output.contains("failure"));
3446 assert!(output.contains("Fixed the auth bug"));
3447 assert!(output.contains("repo"));
3448 assert!(output.contains("0.70"));
3449 }
3450
3451 #[test]
3452 fn test_format_knowledge_long_title_truncated() {
3453 use chrono::Utc;
3454 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3455 use uuid::Uuid;
3456
3457 let items = vec![Knowledge {
3458 id: Uuid::new_v4(),
3459 session_id: Uuid::new_v4(),
3460 kind: KnowledgeKind::Summary,
3461 scope_repo: Some("/repo".into()),
3462 scope_ink: None,
3463 title: "A very long title that exceeds the maximum display width for knowledge items in the CLI".into(),
3464 body: "Body".into(),
3465 tags: vec![],
3466 relevance: 0.5,
3467 created_at: Utc::now(),
3468 }];
3469
3470 let output = format_knowledge(&items);
3471 assert!(output.contains('…'));
3472 }
3473
3474 #[test]
3475 fn test_cli_parse_knowledge_get() {
3476 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--get", "abc-123"]).unwrap();
3477 match &cli.command {
3478 Commands::Knowledge { get, .. } => {
3479 assert_eq!(get.as_deref(), Some("abc-123"));
3480 }
3481 _ => panic!("expected Knowledge command"),
3482 }
3483 }
3484
3485 #[test]
3486 fn test_cli_parse_knowledge_delete() {
3487 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--delete", "abc-123"]).unwrap();
3488 match &cli.command {
3489 Commands::Knowledge { delete, .. } => {
3490 assert_eq!(delete.as_deref(), Some("abc-123"));
3491 }
3492 _ => panic!("expected Knowledge command"),
3493 }
3494 }
3495
3496 #[test]
3497 fn test_cli_parse_knowledge_push() {
3498 let cli = Cli::try_parse_from(["pulpo", "knowledge", "--push"]).unwrap();
3499 match &cli.command {
3500 Commands::Knowledge { push, .. } => {
3501 assert!(*push);
3502 }
3503 _ => panic!("expected Knowledge command"),
3504 }
3505 }
3506
3507 #[test]
3508 fn test_format_knowledge_no_repo() {
3509 use chrono::Utc;
3510 use pulpo_common::knowledge::{Knowledge, KnowledgeKind};
3511 use uuid::Uuid;
3512
3513 let items = vec![Knowledge {
3514 id: Uuid::new_v4(),
3515 session_id: Uuid::new_v4(),
3516 kind: KnowledgeKind::Summary,
3517 scope_repo: None,
3518 scope_ink: None,
3519 title: "Global finding".into(),
3520 body: "Body".into(),
3521 tags: vec![],
3522 relevance: 0.5,
3523 created_at: Utc::now(),
3524 }];
3525
3526 let output = format_knowledge(&items);
3527 assert!(output.contains('-')); }
3529}