1#![allow(clippy::missing_panics_doc)]
2
3use action_core as action;
4use cargo_metadata::DependencyKind;
5use color_eyre::{Section, eyre};
6use futures::Future;
7use futures::stream::{self, FuturesUnordered, StreamExt};
8use std::collections::{HashMap, VecDeque};
9use std::path::PathBuf;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, RwLock};
12use tokio::sync::Semaphore;
13use tokio::time::{Duration, Instant, interval, sleep};
14
15#[derive(Debug)]
17pub struct Options {
18 pub path: PathBuf,
20
21 pub registry_token: Option<String>,
23
24 pub dry_run: bool,
27
28 pub publish_delay: Option<Duration>,
30
31 pub no_verify: bool,
33
34 pub resolve_versions: bool,
42
43 pub include: Option<Vec<String>>,
47
48 pub exclude: Option<Vec<String>>,
52
53 pub max_retries: Option<usize>,
59
60 pub concurrency_limit: Option<usize>,
62}
63
64struct Package {
66 inner: cargo_metadata::Package,
67 path: PathBuf,
68 should_publish: bool,
69 published: Mutex<bool>,
70 deps: RwLock<HashMap<String, Arc<Package>>>,
71 dependants: RwLock<HashMap<String, Arc<Package>>>,
72}
73
74impl std::fmt::Debug for Package {
75 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76 write!(f, "{self}")
77 }
78}
79
80impl std::fmt::Display for Package {
81 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82 f.debug_struct("Package")
83 .field("name", &self.inner.name)
84 .field("version", &self.inner.version.to_string())
85 .field(
86 "deps",
87 &self.deps.read().unwrap().keys().collect::<Vec<_>>(),
88 )
89 .field(
90 "dependants",
91 &self.dependants.read().unwrap().keys().collect::<Vec<_>>(),
92 )
93 .finish()
94 }
95}
96
97impl Package {
98 pub fn published(&self) -> bool {
100 *self.published.lock().unwrap()
101 }
102
103 pub fn ready(&self) -> bool {
107 self.deps.read().unwrap().values().all(|d| d.published())
108 }
109
110 pub async fn is_available(&self) -> eyre::Result<bool> {
112 use crates_io_api::{AsyncClient, Error as RegistryError};
113 use semver::Version;
114
115 let api = AsyncClient::new(
116 "publish_crates (https://github.com/romnn/publish-crates)",
117 std::time::Duration::from_millis(1000),
118 )?;
119
120 let info = match api.get_crate(&self.inner.name).await {
121 Ok(info) => info,
122 Err(RegistryError::NotFound(_)) => return Ok(false),
123 Err(err) => return Err(err.into()),
124 };
125
126 let mut versions = info
127 .versions
128 .iter()
129 .filter_map(|v| match Version::parse(&v.num) {
130 Ok(version) => Some((version, v)),
131 Err(_) => None,
132 });
133 let Some((_, version)) = versions.find(|(ver, _)| ver == &self.inner.version) else {
134 return Ok(false);
135 };
136
137 let client = reqwest::Client::new();
138 let dl_response = client
139 .head(format!("https://crates.io{}", version.dl_path))
140 .send()
141 .await?;
142 Ok(dl_response.status() == reqwest::StatusCode::OK)
143 }
144
145 pub async fn wait_package_available(
147 &self,
148 timeout: impl Into<Option<Duration>>,
149 ) -> eyre::Result<()> {
150 let timeout = timeout
151 .into()
152 .unwrap_or_else(|| Duration::from_secs(2 * 60));
153 let start = Instant::now();
154 let mut ticker = interval(Duration::from_secs(5));
155 loop {
156 ticker.tick().await;
157 action::info!(
158 "[{}@{}] checking if available",
159 self.inner.name,
160 self.inner.version,
161 );
162 if self.is_available().await? {
163 return Ok(());
164 }
165 if Instant::now().duration_since(start) > timeout {
167 eyre::bail!(
168 "exceeded timeout of {:?} waiting for crate {} {} to be published",
169 timeout,
170 self.inner.name,
171 self.inner.version.to_string()
172 );
173 }
174 }
175 }
176
177 pub async fn publish(self: Arc<Self>, options: Arc<Options>) -> eyre::Result<Arc<Self>> {
179 use async_process::Command;
180
181 action::info!("[{}@{}] publishing", self.inner.name, self.inner.version);
182
183 let mut cmd = Command::new("cargo");
184 cmd.arg("publish");
185
186 if options.no_verify {
187 cmd.arg("--no-verify");
188 }
189 cmd.current_dir(&self.path);
190 if let Some(ref token) = options.registry_token {
191 cmd.env("CARGO_REGISTRY_TOKEN", token);
192 }
193 if options.dry_run {
194 cmd.arg("--dry-run");
195 if options.resolve_versions && !self.deps.read().unwrap().is_empty() {
198 action::info!(
201 "[{}@{}] dry-run: proceed without `cargo publish --dry-run` due to resolve version incompatibility",
202 self.inner.name,
203 self.inner.version
204 );
205 *self.published.lock().unwrap() = true;
206 return Ok(self);
207 }
208 }
209 if options.resolve_versions {
210 cmd.arg("--allow-dirty");
212 }
213
214 let max_retries = options.max_retries.unwrap_or(10);
215 let mut attempt = 0;
216 loop {
217 attempt += 1;
218
219 let output = cmd.output().await?;
220 let stdout = String::from_utf8_lossy(&output.stdout);
221 let stderr = String::from_utf8_lossy(&output.stderr);
222 action::debug!("{}", &stdout);
223 action::debug!("{}", &stderr);
224
225 if !output.status.success() {
226 action::warning!("{}", &stdout);
227 action::warning!("{}", &stderr);
228
229 if stderr.contains("already exists on crates.io index") {
245 break;
246 }
247
248 let error = classify_publish_error(&stderr);
249 let wait_duration = match error {
250 PublishError::Fatal(_) => {
251 eyre::bail!("command {:?} failed: {}", cmd, stderr);
252 }
253 PublishError::Retryable(code) => {
254 action::warning!(
255 "[{}@{}] intermittent failure: {} {}",
256 self.inner.name,
257 self.inner.version,
258 code.as_u16(),
259 code.canonical_reason().unwrap_or_default(),
260 );
261 match code {
262 http::StatusCode::TOO_MANY_REQUESTS => {
263 std::time::Duration::from_secs(10 * 60)
265 }
266 _ => std::time::Duration::from_secs(5 * 60),
267 }
268 }
269 PublishError::Unknown => {
270 action::warning!(
271 "[{}@{}] unknown failure",
272 self.inner.name,
273 self.inner.version,
274 );
275 std::time::Duration::from_secs(5 * 60)
277 }
278 };
279
280 if attempt >= max_retries {
281 eyre::bail!("command {:?} failed: {}", cmd, stderr);
282 }
283
284 action::warning!(
285 "[{}@{}] attempting again in {wait_duration:?}",
286 self.inner.name,
287 self.inner.version
288 );
289 sleep(wait_duration).await;
290 } else {
291 break;
292 }
293 }
294
295 if options.dry_run {
296 action::info!(
297 "[{}@{}] dry-run: skip waiting for successful publish",
298 &self.inner.name,
299 self.inner.version
300 );
301 *self.published.lock().unwrap() = true;
302 return Ok(self);
303 }
304
305 self.wait_package_available(None).await?;
307
308 sleep(
309 options
310 .publish_delay
311 .unwrap_or_else(|| Duration::from_secs(30)),
312 )
313 .await;
314
315 let mut cmd = Command::new("cargo");
316 cmd.arg("update");
317 cmd.current_dir(&self.path);
318 let output = cmd.output().await?;
319 if !output.status.success() {
320 eyre::bail!("command {:?} failed", cmd);
321 }
322
323 *self.published.lock().unwrap() = true;
324 action::info!(
325 "[{}@{}] published successfully",
326 self.inner.name,
327 self.inner.version
328 );
329
330 Ok(self)
331 }
332}
333
334type TaskFut = dyn Future<Output = eyre::Result<Arc<Package>>>;
335
336fn find_packages(
337 metadata: &cargo_metadata::Metadata,
338 options: Arc<Options>,
339) -> impl Iterator<Item = (PathBuf, Arc<Package>)> + '_ {
340 let packages = metadata.workspace_packages();
341 packages.into_iter().filter_map(move |package| {
342 let should_publish = package.publish.as_ref().is_none_or(|p| !p.is_empty());
343
344 let is_included = options
345 .include
346 .as_ref()
347 .is_none_or(|inc| inc.is_empty() || inc.contains(&package.name));
348
349 let is_excluded = options
350 .exclude
351 .as_ref()
352 .is_some_and(|excl| excl.contains(&package.name));
353
354 let should_publish = should_publish && is_included && !is_excluded;
355
356 let path: PathBuf = package.manifest_path.parent()?.into();
357 Some((
358 path.clone(),
359 Arc::new(Package {
360 inner: package.clone(),
361 path,
362 should_publish,
363 published: Mutex::new(false),
364 deps: RwLock::new(HashMap::new()),
365 dependants: RwLock::new(HashMap::new()),
366 }),
367 ))
368 })
369}
370
371async fn build_dag(
372 packages: Arc<HashMap<PathBuf, Arc<Package>>>,
373 options: Arc<Options>,
374) -> eyre::Result<()> {
375 let packages_iter = packages.values().filter(|p| p.should_publish);
376 let results: Vec<_> = stream::iter(packages_iter)
377 .map(|p| {
378 let packages = packages.clone();
379 let options = options.clone();
380 async move {
381 use toml_edit::{value, DocumentMut};
382 let manifest_path = &p.inner.manifest_path;
383 let manifest = tokio::fs::read_to_string(manifest_path).await?;
384 let mut manifest = manifest.parse::<DocumentMut>()?;
385 let mut need_update = false;
386
387 for dep in &p.inner.dependencies {
388 let mut dep_version = dep.req.clone();
390 if let Some(path) = dep.path.as_ref().map(PathBuf::from) {
391 let resolved = packages.get(&path).ok_or(eyre::eyre!(
395 "{}: could not resolve local dependency {}",
396 &p.inner.name,
397 path.display()
398 ))?;
399
400 if !resolved.should_publish {
404 eyre::bail!(
405 "{}: cannot publish because dependency {} will not be published",
406 &p.inner.name,
407 &dep.name,
408 );
409 }
410
411 if options.resolve_versions {
412 dep_version = semver::VersionReq {
414 comparators: vec![semver::Comparator {
415 op: semver::Op::Exact,
416 major: resolved.inner.version.major,
417 minor: Some(resolved.inner.version.minor),
418 patch: Some(resolved.inner.version.patch),
419 pre: semver::Prerelease::EMPTY,
420 }],
421 };
422
423 let changed = dep_version != dep.req;
424 if changed {
425 if let Some(kind) = match dep.kind {
427 DependencyKind::Normal => Some("dependencies"),
428 DependencyKind::Development => Some("dev-dependencies"),
429 DependencyKind::Build => Some("build-dependencies"),
430 _ => None,
431 } {
432 manifest[kind][&dep.name]["version"] =
434 value(dep_version.to_string());
435 manifest[kind][&dep.name]
436 .as_inline_table_mut()
437 .map(toml_edit::InlineTable::fmt);
438 need_update = true;
439 }
440 }
441 }
442
443 p.deps
444 .write()
445 .unwrap()
446 .insert(resolved.inner.name.clone(), resolved.clone());
447
448 resolved
449 .dependants
450 .write()
451 .unwrap()
452 .insert(p.inner.name.clone(), p.clone());
453 }
454
455 let is_dev_dep = dep.kind == DependencyKind::Development;
456 let is_non_local_dep = !is_dev_dep || dep.path.is_none();
457 let is_missing_exact_version = dep_version == semver::VersionReq::STAR;
458
459 if is_missing_exact_version && is_non_local_dep {
460 return Err(eyre::eyre!(
461 "{}: dependency {} has no specific version ({})",
462 &p.inner.name,
463 &dep.name,
464 dep_version
465 ).suggestion("to automatically resolve versions of local workspace members, use '--resolve-versions'"));
466 }
467 }
468
469 if !options.dry_run && need_update {
471 use tokio::io::AsyncWriteExt;
472 action::debug!("{}", &manifest.to_string());
473 action::info!("[{}@{}] updating {}", p.inner.name, p.inner.version, p.inner.manifest_path);
474 let mut f = tokio::fs::OpenOptions::new()
475 .write(true)
476 .truncate(true)
477 .open(&p.inner.manifest_path)
478 .await?;
479 f.write_all(manifest.to_string().as_bytes()).await?;
480 }
481
482 Ok(())
483 }
484 })
485 .buffer_unordered(8)
486 .collect()
487 .await;
488
489 results.into_iter().collect::<eyre::Result<Vec<_>>>()?;
491 Ok(())
492}
493
494pub async fn publish(options: Arc<Options>) -> eyre::Result<()> {
499 action::info!("searching cargo packages at {}", options.path.display());
500
501 let manifest_path = if options.path.is_file() {
502 options.path.clone()
503 } else {
504 options.path.join("Cargo.toml")
505 };
506 let metadata = cargo_metadata::MetadataCommand::new()
507 .manifest_path(&manifest_path)
508 .exec()?;
509
510 let packages: Arc<HashMap<PathBuf, Arc<Package>>> =
511 Arc::new(find_packages(&metadata, options.clone()).collect::<HashMap<_, _>>());
512
513 build_dag(packages.clone(), options.clone()).await?;
514
515 action::info!(
516 "found packages: {:?}",
517 packages
518 .values()
519 .map(|p| p.inner.name.clone())
520 .collect::<Vec<_>>()
521 );
522
523 if packages.is_empty() {
524 return Ok(());
526 }
527 let mut ready: VecDeque<Arc<Package>> =
528 packages.values().filter(|p| p.ready()).cloned().collect();
529
530 let mut tasks: FuturesUnordered<Pin<Box<TaskFut>>> = FuturesUnordered::new();
531
532 let limit = options.concurrency_limit.unwrap_or(4);
533 let limit = Arc::new(Semaphore::new(limit));
534
535 loop {
536 if tasks.is_empty() && ready.is_empty() {
538 break;
539 }
540
541 loop {
543 let Ok(permit) = limit.clone().try_acquire_owned() else {
544 break;
545 };
546 match ready.pop_front() {
548 Some(p) if !p.should_publish => {
549 action::info!(
550 "[{}@{}] skipping (publish=false)",
551 p.inner.name,
552 p.inner.version
553 );
554 }
555 Some(p) => {
556 let options_clone = options.clone();
557 tasks.push(Box::pin(async move {
558 let res = p.publish(options_clone).await;
559 drop(permit);
560 res
561 }));
562 }
563 None => break,
565 }
566 }
567
568 match tasks.next().await {
570 Some(Err(err)) => {
571 eyre::bail!("a task failed: {}", err)
572 }
573 Some(Ok(completed)) => {
574 ready.extend(
576 completed
577 .dependants
578 .read()
579 .unwrap()
580 .values()
581 .filter(|d| d.ready() && !d.published())
582 .cloned(),
583 );
584 }
585 None => {}
586 }
587 }
588
589 if !packages
590 .values()
591 .all(|p| !p.should_publish || p.published())
592 {
593 eyre::bail!("not all published");
594 }
595
596 Ok(())
597}
598
599#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
601pub enum PublishError {
602 Unknown,
603 Retryable(http::StatusCode),
604 Fatal(http::StatusCode),
605}
606
607impl PublishError {
608 pub fn code(&self) -> Option<&http::StatusCode> {
609 match self {
610 Self::Unknown => None,
611 Self::Retryable(code) | Self::Fatal(code) => Some(code),
612 }
613 }
614}
615
616fn classify_publish_error(text: &str) -> PublishError {
626 for code_num in 100u16..=599 {
627 let Ok(code) = http::StatusCode::from_u16(code_num) else {
628 continue;
629 };
630 let Some(reason) = code.canonical_reason() else {
631 continue;
632 };
633
634 let needle = format!("{} {}", code.as_str(), reason);
635 if text.contains(&needle) {
636 if code.is_redirection()
637 || code.is_server_error()
638 || code == http::StatusCode::NOT_FOUND
639 || code == http::StatusCode::REQUEST_TIMEOUT
640 || code == http::StatusCode::CONFLICT
641 || code == http::StatusCode::GONE
642 || code == http::StatusCode::PRECONDITION_FAILED
643 || code == http::StatusCode::RANGE_NOT_SATISFIABLE
644 || code == http::StatusCode::EXPECTATION_FAILED
645 || code == http::StatusCode::MISDIRECTED_REQUEST
646 || code == http::StatusCode::UNPROCESSABLE_ENTITY
647 || code == http::StatusCode::LOCKED
648 || code == http::StatusCode::FAILED_DEPENDENCY
649 || code == http::StatusCode::TOO_EARLY
650 || code == http::StatusCode::UPGRADE_REQUIRED
651 || code == http::StatusCode::PRECONDITION_REQUIRED
652 || code == http::StatusCode::TOO_MANY_REQUESTS
653 || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
654 || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
655 {
656 return PublishError::Retryable(code);
657 } else {
658 return PublishError::Fatal(code);
659 }
660 }
661 }
662
663 PublishError::Unknown
664}
665
666#[cfg(test)]
667mod tests {
668 use similar_asserts::assert_eq as sim_assert_eq;
669
670 #[test]
671 fn classify_publish_error() {
672 sim_assert_eq!(
673 super::classify_publish_error(
674 "the remote server responded with an error (status 429 Too Many Requests): You have published too many new crates in a short period of time. Please try again after Mon, 21 Apr 2025 19:31:32 GMT or email help@crates.io to have your limit increased."
675 ),
676 super::PublishError::Retryable(http::StatusCode::TOO_MANY_REQUESTS)
677 );
678
679 sim_assert_eq!(
680 super::classify_publish_error(
681 "the remote server responded with an error (status 500 Internal Server Error): Internal Server Error"
682 ),
683 super::PublishError::Retryable(http::StatusCode::INTERNAL_SERVER_ERROR)
684 );
685
686 sim_assert_eq!(
687 super::classify_publish_error(
688 "the remote server responded with some error we don't know more about"
689 ),
690 super::PublishError::Unknown,
691 );
692 }
693}