publish_crates/
lib.rs

1use action_core as action;
2use cargo_metadata::DependencyKind;
3use color_eyre::{Section, eyre};
4use futures::Future;
5use futures::stream::{self, FuturesUnordered, StreamExt};
6use std::collections::{HashMap, VecDeque};
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, RwLock};
10use tokio::sync::Semaphore;
11use tokio::time::{Duration, Instant, interval, sleep};
12
13const DATETIME_FORMAT: &'static [time::format_description::BorrowedFormatItem<'static>] =
14    time::macros::format_description!("[hour]:[minute]:[second]");
15
16/// Options for publishing packages.
17#[derive(Debug)]
18pub struct Options {
19    /// Path to package or workspace
20    pub path: PathBuf,
21
22    /// Cargo registry token
23    pub registry_token: Option<String>,
24
25    /// Perform dry-run
26    /// This will perform all checks without publishing the package
27    pub dry_run: bool,
28
29    /// Delay before attempting to publish dependent crate
30    pub publish_delay: Option<Duration>,
31
32    /// Disable pre-publish validation checks
33    pub no_verify: bool,
34
35    /// Resolve missing versions for local packages.
36    ///
37    /// Versions of local packages that use `{ path = "../some/path" }`
38    /// will be resolved to the version of the package the `path` is pointing to.
39    /// Note that even if `version` is present, the resolved value will be used.
40    ///
41    /// **Note**: This will update your `Cargo.toml` manifest with the resolved version.
42    pub resolve_versions: bool,
43
44    /// Packages that should be published
45    ///
46    /// If using explicit include, specify all package names you wish to publish
47    pub include: Option<Vec<String>>,
48
49    /// Packages that should not be published
50    ///
51    /// Excluded package names have precedence over included package names.
52    pub exclude: Option<Vec<String>>,
53
54    /// Maximum number of retries when encountering intermittent errors.
55    ///
56    /// Common intermittent failures are:
57    /// - 500 Internal Server Error
58    /// - 429 Too Many Requests
59    pub max_retries: Option<usize>,
60
61    /// Maximum number of packages to publish concurrently.
62    pub concurrency_limit: Option<usize>,
63}
64
65/// A cargo package.
66struct Package {
67    inner: cargo_metadata::Package,
68    path: PathBuf,
69    should_publish: bool,
70    published: Mutex<bool>,
71    deps: RwLock<HashMap<String, Arc<Package>>>,
72    dependants: RwLock<HashMap<String, Arc<Package>>>,
73}
74
75impl std::fmt::Debug for Package {
76    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
77        write!(f, "{self}")
78    }
79}
80
81impl std::fmt::Display for Package {
82    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83        f.debug_struct("Package")
84            .field("name", &self.inner.name)
85            .field("version", &self.inner.version.to_string())
86            .field(
87                "deps",
88                &self.deps.read().unwrap().keys().collect::<Vec<_>>(),
89            )
90            .field(
91                "dependants",
92                &self.dependants.read().unwrap().keys().collect::<Vec<_>>(),
93            )
94            .finish()
95    }
96}
97
98impl Package {
99    /// Returns `true` if the package has been successfully published.
100    pub fn published(&self) -> bool {
101        *self.published.lock().unwrap()
102    }
103
104    /// Checks if the package is ready for publishing.
105    ///
106    /// A package can be published if all its dependencies have been published.
107    pub fn ready(&self) -> bool {
108        self.deps.read().unwrap().values().all(|d| d.published())
109    }
110
111    /// Wait until the published package is available on the registry.
112    pub async fn is_available(&self) -> eyre::Result<bool> {
113        use crates_io_api::{AsyncClient, Error as RegistryError};
114        use semver::Version;
115
116        let api = AsyncClient::new(
117            "publish_crates (https://github.com/romnn/publish-crates)",
118            std::time::Duration::from_millis(1000),
119        )?;
120
121        let info = match api.get_crate(&self.inner.name).await {
122            Ok(info) => info,
123            Err(RegistryError::NotFound(_)) => return Ok(false),
124            Err(err) => return Err(err.into()),
125        };
126
127        let mut versions = info
128            .versions
129            .iter()
130            .filter_map(|v| match Version::parse(&v.num) {
131                Ok(version) => Some((version, v)),
132                Err(_) => None,
133            });
134        let Some((_, version)) = versions.find(|(ver, _)| ver == &self.inner.version) else {
135            return Ok(false);
136        };
137
138        let client = reqwest::Client::new();
139        let dl_response = client
140            .head(format!("https://crates.io{}", version.dl_path))
141            .send()
142            .await?;
143        Ok(dl_response.status() == reqwest::StatusCode::OK)
144    }
145
146    /// Wait until the published package is available on the registry.
147    pub async fn wait_package_available(
148        &self,
149        timeout: impl Into<Option<Duration>>,
150    ) -> eyre::Result<()> {
151        let timeout = timeout
152            .into()
153            .unwrap_or_else(|| Duration::from_secs(2 * 60));
154        let start = Instant::now();
155        let mut ticker = interval(Duration::from_secs(5));
156        loop {
157            ticker.tick().await;
158            action::info!(
159                "[{}@{}] checking if available",
160                self.inner.name,
161                self.inner.version,
162            );
163            if self.is_available().await? {
164                return Ok(());
165            }
166            // check for timeout
167            if Instant::now().duration_since(start) > timeout {
168                eyre::bail!(
169                    "exceeded timeout of {:?} waiting for crate {} {} to be published",
170                    timeout,
171                    self.inner.name,
172                    self.inner.version.to_string()
173                );
174            }
175        }
176    }
177
178    /// Publishes this package
179    pub async fn publish(self: Arc<Self>, options: Arc<Options>) -> eyre::Result<Arc<Self>> {
180        use async_process::Command;
181
182        action::info!("[{}@{}] publishing", self.inner.name, self.inner.version);
183
184        let mut cmd = Command::new("cargo");
185        cmd.arg("publish");
186
187        if options.no_verify {
188            cmd.arg("--no-verify");
189        }
190        cmd.current_dir(&self.path);
191        if let Some(ref token) = options.registry_token {
192            cmd.env("CARGO_REGISTRY_TOKEN", token);
193        }
194        if options.dry_run {
195            cmd.arg("--dry-run");
196            // skip checking if local package versions are available on crates.io as they are not
197            // published during dry-run
198            if options.resolve_versions && !self.deps.read().unwrap().is_empty() {
199                // cmd.arg("--offline");
200                // skip cargo dry-run as it will always fail
201                action::info!(
202                    "[{}@{}] dry-run: proceed without `cargo publish --dry-run` due to resolve version incompatibility",
203                    self.inner.name,
204                    self.inner.version
205                );
206                *self.published.lock().unwrap() = true;
207                return Ok(self);
208            }
209        }
210        if options.resolve_versions {
211            // when resolving versions, we may write to Cargo.toml
212            cmd.arg("--allow-dirty");
213        }
214
215        let max_retries = options.max_retries.unwrap_or(10);
216        let mut attempt = 0;
217
218        loop {
219            attempt += 1;
220
221            if attempt > 1 {
222                action::warning!(
223                    "[{}@{}] publishing (attempt {}/{})",
224                    self.inner.name,
225                    self.inner.version,
226                    attempt,
227                    max_retries
228                );
229            }
230
231            let output = cmd.output().await?;
232            let stdout = String::from_utf8_lossy(&output.stdout);
233            let stderr = String::from_utf8_lossy(&output.stderr);
234            action::debug!("{}", &stdout);
235            action::debug!("{}", &stderr);
236
237            if !output.status.success() {
238                action::warning!("{}", &stdout);
239                action::warning!("{}", &stderr);
240
241                if stderr.contains("already exists on crates.io index") {
242                    break;
243                }
244
245                let error = classify_publish_error(&stderr);
246                let wait_duration = match error {
247                    PublishError::Fatal(_) => {
248                        eyre::bail!("command {:?} failed: {}", cmd, stderr);
249                    }
250                    PublishError::Retryable(code) => {
251                        action::warning!(
252                            "[{}@{}] intermittent failure: {} {}",
253                            self.inner.name,
254                            self.inner.version,
255                            code.as_u16(),
256                            code.canonical_reason().unwrap_or_default(),
257                        );
258                        match code {
259                            http::StatusCode::TOO_MANY_REQUESTS => {
260                                // 10 minutes
261                                std::time::Duration::from_secs(10 * 60)
262                            }
263                            // 5 minutes
264                            _ => std::time::Duration::from_secs(5 * 60),
265                        }
266                    }
267                    PublishError::Unknown => {
268                        action::warning!(
269                            "[{}@{}] unknown failure",
270                            self.inner.name,
271                            self.inner.version,
272                        );
273                        // 5 minutes
274                        std::time::Duration::from_secs(5 * 60)
275                    }
276                };
277
278                if attempt >= max_retries {
279                    eyre::bail!("command {:?} failed: {}", cmd, stderr);
280                }
281
282                let next_attempt = std::time::SystemTime::now() + wait_duration;
283                action::warning!(
284                    "[{}@{}] attempting again in {wait_duration:?} at {}",
285                    self.inner.name,
286                    self.inner.version,
287                    time::OffsetDateTime::from(next_attempt)
288                        .format(DATETIME_FORMAT)
289                        .unwrap_or_else(|_| humantime::format_rfc3339(next_attempt).to_string())
290                );
291                sleep(wait_duration).await;
292            } else {
293                break;
294            }
295        }
296
297        if options.dry_run {
298            action::info!(
299                "[{}@{}] dry-run: skip waiting for successful publish",
300                &self.inner.name,
301                self.inner.version
302            );
303            *self.published.lock().unwrap() = true;
304            return Ok(self);
305        }
306
307        // wait for package to be available on the registry
308        self.wait_package_available(None).await?;
309
310        sleep(
311            options
312                .publish_delay
313                .unwrap_or_else(|| Duration::from_secs(30)),
314        )
315        .await;
316
317        let mut cmd = Command::new("cargo");
318        cmd.arg("update");
319        cmd.current_dir(&self.path);
320        let output = cmd.output().await?;
321        if !output.status.success() {
322            eyre::bail!("command {:?} failed", cmd);
323        }
324
325        *self.published.lock().unwrap() = true;
326        action::info!(
327            "[{}@{}] published successfully",
328            self.inner.name,
329            self.inner.version
330        );
331
332        Ok(self)
333    }
334}
335
336type TaskFut = dyn Future<Output = eyre::Result<Arc<Package>>>;
337
338fn find_packages<'a>(
339    metadata: &cargo_metadata::Metadata,
340    options: &'a Options,
341) -> impl Iterator<Item = (PathBuf, Arc<Package>)> {
342    let packages = metadata.workspace_packages();
343    packages.into_iter().filter_map(move |package| {
344        let should_publish = package.publish.as_ref().is_none_or(|p| !p.is_empty());
345
346        let is_included = options
347            .include
348            .as_ref()
349            .is_none_or(|inc| inc.is_empty() || inc.contains(&package.name));
350
351        let is_excluded = options
352            .exclude
353            .as_ref()
354            .is_some_and(|excl| excl.contains(&package.name));
355
356        let should_publish = should_publish && is_included && !is_excluded;
357
358        let path: PathBuf = package.manifest_path.parent()?.into();
359        Some((
360            path.clone(),
361            Arc::new(Package {
362                inner: package.clone(),
363                path,
364                should_publish,
365                published: Mutex::new(false),
366                deps: RwLock::new(HashMap::new()),
367                dependants: RwLock::new(HashMap::new()),
368            }),
369        ))
370    })
371}
372
373async fn build_dag(
374    packages: &HashMap<PathBuf, Arc<Package>>,
375    options: &Options,
376) -> eyre::Result<()> {
377    let packages_iter = packages.values().filter(|p| p.should_publish);
378    let results: Vec<_> = stream::iter(packages_iter)
379        .map(|p| async move {
380            use toml_edit::{value, DocumentMut};
381            let manifest_path = &p.inner.manifest_path;
382            let manifest = tokio::fs::read_to_string(manifest_path).await?;
383            let mut manifest = manifest.parse::<DocumentMut>()?;
384            let mut need_update = false;
385
386            for dep in &p.inner.dependencies {
387                let mut dep_version = dep.req.clone();
388                if let Some(path) = dep.path.as_ref().map(PathBuf::from) {
389                    // also if the version is set, we want to resolve automatically?
390                    // OR we allow changing and always set allow-dirty
391                    // dep_version == semver::VersionReq::STAR &&
392                    let resolved = packages.get(&path).ok_or(eyre::eyre!(
393                        "{}: could not resolve local dependency {}",
394                        &p.inner.name,
395                        path.display()
396                    ))?;
397
398                    // ensure that all local dependencies for a package
399                    // that should be published are also going to
400                    // be published
401                    if !resolved.should_publish {
402                        eyre::bail!(
403                            "{}: cannot publish because dependency {} will not be published",
404                            &p.inner.name,
405                            &dep.name,
406                        );
407                    }
408
409                    if options.resolve_versions {
410                        // use version from the manifest the path points to
411                        dep_version = semver::VersionReq {
412                            comparators: vec![semver::Comparator {
413                                op: semver::Op::Exact,
414                                major: resolved.inner.version.major,
415                                minor: Some(resolved.inner.version.minor),
416                                patch: Some(resolved.inner.version.patch),
417                                pre: semver::Prerelease::EMPTY,
418                            }],
419                        };
420
421                        let changed = dep_version != dep.req;
422                        if changed {
423                            // update cargo manifest
424                            let section = match dep.kind {
425                                DependencyKind::Normal => Some("dependencies"),
426                                DependencyKind::Development => Some("dev-dependencies"),
427                                DependencyKind::Build => Some("build-dependencies"),
428                                _ => None,
429                            };
430                            if let Some(section) = section {
431                                manifest[section][&dep.name]["version"] =
432                                    value(dep_version.to_string());
433                                manifest[section][&dep.name]
434                                    .as_inline_table_mut()
435                                    .map(toml_edit::InlineTable::fmt);
436                                need_update = true;
437                            }
438                        }
439                    }
440
441                    p.deps
442                        .write()
443                        .unwrap()
444                        .insert(resolved.inner.name.clone(), resolved.clone());
445
446                    resolved
447                        .dependants
448                        .write()
449                        .unwrap()
450                        .insert(p.inner.name.clone(), p.clone());
451                }
452
453                let is_dev_dep = dep.kind == DependencyKind::Development;
454                let is_non_local_dep = !is_dev_dep || dep.path.is_none();
455                let is_missing_exact_version = dep_version == semver::VersionReq::STAR;
456
457                if is_missing_exact_version && is_non_local_dep {
458                    return Err(eyre::eyre!(
459                        "{}: dependency {} has no specific version ({})",
460                        &p.inner.name,
461                        &dep.name,
462                        dep_version
463                    ).suggestion("to automatically resolve versions of local workspace members, use '--resolve-versions'"));
464                }
465            }
466
467            // write updated cargo manifest
468            if !options.dry_run && need_update {
469                use tokio::io::AsyncWriteExt;
470                action::debug!("{}", &manifest.to_string());
471                action::info!("[{}@{}] updating {}", p.inner.name, p.inner.version, p.inner.manifest_path);
472                let mut f = tokio::fs::OpenOptions::new()
473                    .write(true)
474                    .truncate(true)
475                    .open(&p.inner.manifest_path)
476                    .await?;
477                f.write_all(manifest.to_string().as_bytes()).await?;
478            }
479
480            Ok(())
481        })
482        .buffer_unordered(8)
483        .collect()
484        .await;
485
486    // fail on error
487    results.into_iter().collect::<eyre::Result<Vec<_>>>()?;
488    Ok(())
489}
490
491/// Publishes packages of a project on crates.io.
492///
493/// # Errors
494/// If any package cannot be published.
495pub async fn publish(mut options: Options) -> eyre::Result<()> {
496    action::info!("searching cargo packages at {}", options.path.display());
497
498    let manifest_path = if options.path.is_file() {
499        options.path.clone()
500    } else {
501        options.path.join("Cargo.toml")
502    };
503    let metadata = cargo_metadata::MetadataCommand::new()
504        .manifest_path(&manifest_path)
505        .exec()?;
506
507    let packages: HashMap<PathBuf, Arc<Package>> = find_packages(&metadata, &options).collect();
508    build_dag(&packages, &options).await?;
509
510    action::info!(
511        "found packages: {:?}",
512        packages
513            .values()
514            .map(|p| p.inner.name.clone())
515            .collect::<Vec<_>>()
516    );
517
518    options.max_retries = Some(options.max_retries.unwrap_or(2 * packages.len()));
519    let options = Arc::new(options);
520
521    if packages.is_empty() {
522        // fast path: nothing to do here
523        return Ok(());
524    }
525    let mut ready: VecDeque<Arc<Package>> =
526        packages.values().filter(|p| p.ready()).cloned().collect();
527
528    let mut tasks: FuturesUnordered<Pin<Box<TaskFut>>> = FuturesUnordered::new();
529
530    let limit = options.concurrency_limit.unwrap_or(4);
531    let limit = Arc::new(Semaphore::new(limit));
532
533    loop {
534        // check if we are done
535        if tasks.is_empty() && ready.is_empty() {
536            break;
537        }
538
539        // start running ready tasks
540        loop {
541            // acquire permit
542            let Ok(permit) = limit.clone().try_acquire_owned() else {
543                break;
544            };
545            // check if we can publish
546            match ready.pop_front() {
547                Some(p) if !p.should_publish => {
548                    action::info!(
549                        "[{}@{}] skipping (publish=false)",
550                        p.inner.name,
551                        p.inner.version
552                    );
553                }
554                Some(p) => {
555                    tasks.push({
556                        let options = Arc::clone(&options);
557                        Box::pin(async move {
558                            let res = p.publish(options).await;
559
560                            // release permit
561                            drop(permit);
562                            res
563                        })
564                    });
565                }
566                // no more tasks
567                None => break,
568            }
569        }
570
571        // wait for a task to complete
572        match tasks.next().await {
573            Some(Err(err)) => {
574                eyre::bail!("a task failed: {}", err)
575            }
576            Some(Ok(completed)) => {
577                // update ready tasks
578                ready.extend(
579                    completed
580                        .dependants
581                        .read()
582                        .unwrap()
583                        .values()
584                        .filter(|d| d.ready() && !d.published())
585                        .cloned(),
586                );
587            }
588            None => {}
589        }
590    }
591
592    if !packages
593        .values()
594        .all(|p| !p.should_publish || p.published())
595    {
596        eyre::bail!("not all published");
597    }
598
599    Ok(())
600}
601
602/// Classification of publishing errors.
603#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
604pub enum PublishError {
605    Unknown,
606    Retryable(http::StatusCode),
607    Fatal(http::StatusCode),
608}
609
610impl PublishError {
611    pub fn code(&self) -> Option<&http::StatusCode> {
612        match self {
613            Self::Unknown => None,
614            Self::Retryable(code) | Self::Fatal(code) => Some(code),
615        }
616    }
617}
618
619/// Classify publish errors based on the error message.
620///
621/// This approach assumes that the error messages of `cargo publish` include network errors
622/// in the form `<code> <canonical_reason>`.
623///
624/// # Returns:
625/// - `Retryable(code)` if a temporary / intermittent error was detected
626/// - `Fatal(code)` if a fatal network error was detected (such as missing permissions)
627/// - `Unknown` otherwise
628fn classify_publish_error(text: &str) -> PublishError {
629    for code_num in 100u16..=599 {
630        let Ok(code) = http::StatusCode::from_u16(code_num) else {
631            continue;
632        };
633        let Some(reason) = code.canonical_reason() else {
634            continue;
635        };
636
637        let needle = format!("{} {}", code.as_str(), reason);
638        if text.contains(&needle) {
639            if code.is_redirection()
640                || code.is_server_error()
641                || code == http::StatusCode::NOT_FOUND
642                || code == http::StatusCode::REQUEST_TIMEOUT
643                || code == http::StatusCode::CONFLICT
644                || code == http::StatusCode::GONE
645                || code == http::StatusCode::PRECONDITION_FAILED
646                || code == http::StatusCode::RANGE_NOT_SATISFIABLE
647                || code == http::StatusCode::EXPECTATION_FAILED
648                || code == http::StatusCode::MISDIRECTED_REQUEST
649                || code == http::StatusCode::UNPROCESSABLE_ENTITY
650                || code == http::StatusCode::LOCKED
651                || code == http::StatusCode::FAILED_DEPENDENCY
652                || code == http::StatusCode::TOO_EARLY
653                || code == http::StatusCode::UPGRADE_REQUIRED
654                || code == http::StatusCode::PRECONDITION_REQUIRED
655                || code == http::StatusCode::TOO_MANY_REQUESTS
656                || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
657                || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
658            {
659                return PublishError::Retryable(code);
660            } else {
661                return PublishError::Fatal(code);
662            }
663        }
664    }
665
666    PublishError::Unknown
667}
668
669#[cfg(test)]
670mod tests {
671    use similar_asserts::assert_eq as sim_assert_eq;
672
673    #[test]
674    fn classify_publish_error() {
675        sim_assert_eq!(
676            super::classify_publish_error(
677                "the remote server responded with an error (status 429 Too Many Requests): You have published too many new crates in a short period of time. Please try again after Mon, 21 Apr 2025 19:31:32 GMT or email help@crates.io to have your limit increased."
678            ),
679            super::PublishError::Retryable(http::StatusCode::TOO_MANY_REQUESTS)
680        );
681
682        sim_assert_eq!(
683            super::classify_publish_error(
684                "the remote server responded with an error (status 500 Internal Server Error): Internal Server Error"
685            ),
686            super::PublishError::Retryable(http::StatusCode::INTERNAL_SERVER_ERROR)
687        );
688
689        sim_assert_eq!(
690            super::classify_publish_error(
691                "the remote server responded with some error we don't know more about"
692            ),
693            super::PublishError::Unknown,
694        );
695    }
696}