publish_crates/
lib.rs

1#![allow(clippy::missing_panics_doc)]
2
3use action_core as action;
4use cargo_metadata::DependencyKind;
5use color_eyre::{Section, eyre};
6use futures::Future;
7use futures::stream::{self, FuturesUnordered, StreamExt};
8use std::collections::{HashMap, VecDeque};
9use std::path::PathBuf;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, RwLock};
12use tokio::sync::Semaphore;
13use tokio::time::{Duration, Instant, interval, sleep};
14
15/// Options for publishing packages.
16#[derive(Debug)]
17pub struct Options {
18    /// Path to package or workspace
19    pub path: PathBuf,
20
21    /// Cargo registry token
22    pub registry_token: Option<String>,
23
24    /// Perform dry-run
25    /// This will perform all checks without publishing the package
26    pub dry_run: bool,
27
28    /// Delay before attempting to publish dependent crate
29    pub publish_delay: Option<Duration>,
30
31    /// Disable pre-publish validation checks
32    pub no_verify: bool,
33
34    /// Resolve missing versions for local packages.
35    ///
36    /// Versions of local packages that use `{ path = "../some/path" }`
37    /// will be resolved to the version of the package the `path` is pointing to.
38    /// Note that even if `version` is present, the resolved value will be used.
39    ///
40    /// **Note**: This will update your `Cargo.toml` manifest with the resolved version.
41    pub resolve_versions: bool,
42
43    /// Packages that should be published
44    ///
45    /// If using explicit include, specify all package names you wish to publish
46    pub include: Option<Vec<String>>,
47
48    /// Packages that should not be published
49    ///
50    /// Excluded package names have precedence over included package names.
51    pub exclude: Option<Vec<String>>,
52
53    /// Maximum number of retries when encountering intermittent errors.
54    ///
55    /// Common intermittent failures are:
56    /// - 500 Internal Server Error
57    /// - 429 Too Many Requests
58    pub max_retries: Option<usize>,
59
60    /// Maximum number of packages to publish concurrently.
61    pub concurrency_limit: Option<usize>,
62}
63
64/// A cargo package.
65struct Package {
66    inner: cargo_metadata::Package,
67    path: PathBuf,
68    should_publish: bool,
69    published: Mutex<bool>,
70    deps: RwLock<HashMap<String, Arc<Package>>>,
71    dependants: RwLock<HashMap<String, Arc<Package>>>,
72}
73
74impl std::fmt::Debug for Package {
75    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
76        write!(f, "{self}")
77    }
78}
79
80impl std::fmt::Display for Package {
81    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82        f.debug_struct("Package")
83            .field("name", &self.inner.name)
84            .field("version", &self.inner.version.to_string())
85            .field(
86                "deps",
87                &self.deps.read().unwrap().keys().collect::<Vec<_>>(),
88            )
89            .field(
90                "dependants",
91                &self.dependants.read().unwrap().keys().collect::<Vec<_>>(),
92            )
93            .finish()
94    }
95}
96
97impl Package {
98    /// Returns `true` if the package has been successfully published.
99    pub fn published(&self) -> bool {
100        *self.published.lock().unwrap()
101    }
102
103    /// Checks if the package is ready for publishing.
104    ///
105    /// A package can be published if all its dependencies have been published.
106    pub fn ready(&self) -> bool {
107        self.deps.read().unwrap().values().all(|d| d.published())
108    }
109
110    /// Wait until the published package is available on the registry.
111    pub async fn is_available(&self) -> eyre::Result<bool> {
112        use crates_io_api::{AsyncClient, Error as RegistryError};
113        use semver::Version;
114
115        let api = AsyncClient::new(
116            "publish_crates (https://github.com/romnn/publish-crates)",
117            std::time::Duration::from_millis(1000),
118        )?;
119
120        let info = match api.get_crate(&self.inner.name).await {
121            Ok(info) => info,
122            Err(RegistryError::NotFound(_)) => return Ok(false),
123            Err(err) => return Err(err.into()),
124        };
125
126        let mut versions = info
127            .versions
128            .iter()
129            .filter_map(|v| match Version::parse(&v.num) {
130                Ok(version) => Some((version, v)),
131                Err(_) => None,
132            });
133        let Some((_, version)) = versions.find(|(ver, _)| ver == &self.inner.version) else {
134            return Ok(false);
135        };
136
137        let client = reqwest::Client::new();
138        let dl_response = client
139            .head(format!("https://crates.io{}", version.dl_path))
140            .send()
141            .await?;
142        Ok(dl_response.status() == reqwest::StatusCode::OK)
143    }
144
145    /// Wait until the published package is available on the registry.
146    pub async fn wait_package_available(
147        &self,
148        timeout: impl Into<Option<Duration>>,
149    ) -> eyre::Result<()> {
150        let timeout = timeout
151            .into()
152            .unwrap_or_else(|| Duration::from_secs(2 * 60));
153        let start = Instant::now();
154        let mut ticker = interval(Duration::from_secs(5));
155        loop {
156            ticker.tick().await;
157            action::info!(
158                "[{}@{}] checking if available",
159                self.inner.name,
160                self.inner.version,
161            );
162            if self.is_available().await? {
163                return Ok(());
164            }
165            // check for timeout
166            if Instant::now().duration_since(start) > timeout {
167                eyre::bail!(
168                    "exceeded timeout of {:?} waiting for crate {} {} to be published",
169                    timeout,
170                    self.inner.name,
171                    self.inner.version.to_string()
172                );
173            }
174        }
175    }
176
177    /// Publishes this package
178    pub async fn publish(self: Arc<Self>, options: Arc<Options>) -> eyre::Result<Arc<Self>> {
179        use async_process::Command;
180
181        action::info!("[{}@{}] publishing", self.inner.name, self.inner.version);
182
183        let mut cmd = Command::new("cargo");
184        cmd.arg("publish");
185
186        if options.no_verify {
187            cmd.arg("--no-verify");
188        }
189        cmd.current_dir(&self.path);
190        if let Some(ref token) = options.registry_token {
191            cmd.env("CARGO_REGISTRY_TOKEN", token);
192        }
193        if options.dry_run {
194            cmd.arg("--dry-run");
195            // skip checking if local package versions are available on crates.io as they are not
196            // published during dry-run
197            if options.resolve_versions && !self.deps.read().unwrap().is_empty() {
198                // cmd.arg("--offline");
199                // skip cargo dry-run as it will always fail
200                action::info!(
201                    "[{}@{}] dry-run: proceed without `cargo publish --dry-run` due to resolve version incompatibility",
202                    self.inner.name,
203                    self.inner.version
204                );
205                *self.published.lock().unwrap() = true;
206                return Ok(self);
207            }
208        }
209        if options.resolve_versions {
210            // when resolving versions, we may write to Cargo.toml
211            cmd.arg("--allow-dirty");
212        }
213
214        let max_retries = options.max_retries.unwrap_or(10);
215        let mut attempt = 0;
216        loop {
217            attempt += 1;
218
219            let output = cmd.output().await?;
220            let stdout = String::from_utf8_lossy(&output.stdout);
221            let stderr = String::from_utf8_lossy(&output.stderr);
222            action::debug!("{}", &stdout);
223            action::debug!("{}", &stderr);
224
225            if !output.status.success() {
226                action::warning!("{}", &stdout);
227                action::warning!("{}", &stderr);
228
229                // let is_too_many_requests = stderr.contains("429 Too Many Requests");
230                // let is_internal_server_error = stderr.contains("500 Internal Server Error");
231                // let is_intermittent_failure = is_too_many_requests || is_internal_server_error;
232                //
233                // let mut wait_duration = std::time::Duration::from_secs(1 * 60);
234                //
235                // if is_too_many_requests {
236                //     action::warning!(
237                //         "[{}@{}] intermittent failure: 429 Too Many Requests",
238                //         self.inner.name,
239                //         self.inner.version
240                //     );
241                //     wait_duration = std::time::Duration::from_secs(10 * 60);
242                // }
243
244                if stderr.contains("already exists on crates.io index") {
245                    break;
246                }
247
248                let error = classify_publish_error(&stderr);
249                let wait_duration = match error {
250                    PublishError::Fatal(_) => {
251                        eyre::bail!("command {:?} failed: {}", cmd, stderr);
252                    }
253                    PublishError::Retryable(code) => {
254                        action::warning!(
255                            "[{}@{}] intermittent failure: {} {}",
256                            self.inner.name,
257                            self.inner.version,
258                            code.as_u16(),
259                            code.canonical_reason().unwrap_or_default(),
260                        );
261                        match code {
262                            http::StatusCode::TOO_MANY_REQUESTS => {
263                                // 10 minutes
264                                std::time::Duration::from_secs(10 * 60)
265                            }
266                            _ => std::time::Duration::from_secs(5 * 60),
267                        }
268                    }
269                    PublishError::Unknown => {
270                        action::warning!(
271                            "[{}@{}] unknown failure",
272                            self.inner.name,
273                            self.inner.version,
274                        );
275                        // 5 minutes
276                        std::time::Duration::from_secs(5 * 60)
277                    }
278                };
279
280                if attempt >= max_retries {
281                    eyre::bail!("command {:?} failed: {}", cmd, stderr);
282                }
283
284                action::warning!(
285                    "[{}@{}] attempting again in {wait_duration:?}",
286                    self.inner.name,
287                    self.inner.version
288                );
289                sleep(wait_duration).await;
290            } else {
291                break;
292            }
293        }
294
295        if options.dry_run {
296            action::info!(
297                "[{}@{}] dry-run: skip waiting for successful publish",
298                &self.inner.name,
299                self.inner.version
300            );
301            *self.published.lock().unwrap() = true;
302            return Ok(self);
303        }
304
305        // wait for package to be available on the registry
306        self.wait_package_available(None).await?;
307
308        sleep(
309            options
310                .publish_delay
311                .unwrap_or_else(|| Duration::from_secs(30)),
312        )
313        .await;
314
315        let mut cmd = Command::new("cargo");
316        cmd.arg("update");
317        cmd.current_dir(&self.path);
318        let output = cmd.output().await?;
319        if !output.status.success() {
320            eyre::bail!("command {:?} failed", cmd);
321        }
322
323        *self.published.lock().unwrap() = true;
324        action::info!(
325            "[{}@{}] published successfully",
326            self.inner.name,
327            self.inner.version
328        );
329
330        Ok(self)
331    }
332}
333
334type TaskFut = dyn Future<Output = eyre::Result<Arc<Package>>>;
335
336fn find_packages(
337    metadata: &cargo_metadata::Metadata,
338    options: Arc<Options>,
339) -> impl Iterator<Item = (PathBuf, Arc<Package>)> + '_ {
340    let packages = metadata.workspace_packages();
341    packages.into_iter().filter_map(move |package| {
342        let should_publish = package.publish.as_ref().is_none_or(|p| !p.is_empty());
343
344        let is_included = options
345            .include
346            .as_ref()
347            .is_none_or(|inc| inc.is_empty() || inc.contains(&package.name));
348
349        let is_excluded = options
350            .exclude
351            .as_ref()
352            .is_some_and(|excl| excl.contains(&package.name));
353
354        let should_publish = should_publish && is_included && !is_excluded;
355
356        let path: PathBuf = package.manifest_path.parent()?.into();
357        Some((
358            path.clone(),
359            Arc::new(Package {
360                inner: package.clone(),
361                path,
362                should_publish,
363                published: Mutex::new(false),
364                deps: RwLock::new(HashMap::new()),
365                dependants: RwLock::new(HashMap::new()),
366            }),
367        ))
368    })
369}
370
371async fn build_dag(
372    packages: Arc<HashMap<PathBuf, Arc<Package>>>,
373    options: Arc<Options>,
374) -> eyre::Result<()> {
375    let packages_iter = packages.values().filter(|p| p.should_publish);
376    let results: Vec<_> = stream::iter(packages_iter)
377        .map(|p| {
378            let packages = packages.clone();
379            let options = options.clone();
380            async move {
381                use toml_edit::{value, DocumentMut};
382                let manifest_path = &p.inner.manifest_path;
383                let manifest = tokio::fs::read_to_string(manifest_path).await?;
384                let mut manifest = manifest.parse::<DocumentMut>()?;
385                let mut need_update = false;
386
387                for dep in &p.inner.dependencies {
388                    // dbg!(&dep);
389                    let mut dep_version = dep.req.clone();
390                    if let Some(path) = dep.path.as_ref().map(PathBuf::from) {
391                        // also if the version is set, we want to resolve automatically?
392                        // OR we allow changing and always set allow-dirty
393                        // dep_version == semver::VersionReq::STAR &&
394                        let resolved = packages.get(&path).ok_or(eyre::eyre!(
395                            "{}: could not resolve local dependency {}",
396                            &p.inner.name,
397                            path.display()
398                        ))?;
399
400                        // ensure that all local dependencies for a package
401                        // that should be published are also going to
402                        // be published
403                        if !resolved.should_publish {
404                            eyre::bail!(
405                                "{}: cannot publish because dependency {} will not be published",
406                                &p.inner.name,
407                                &dep.name,
408                            );
409                        }
410
411                        if options.resolve_versions {
412                            // use version from the manifest the path points to
413                            dep_version = semver::VersionReq {
414                                comparators: vec![semver::Comparator {
415                                    op: semver::Op::Exact,
416                                    major: resolved.inner.version.major,
417                                    minor: Some(resolved.inner.version.minor),
418                                    patch: Some(resolved.inner.version.patch),
419                                    pre: semver::Prerelease::EMPTY,
420                                }],
421                            };
422
423                            let changed = dep_version != dep.req;
424                            if changed {
425                                // update cargo manifest
426                                if let Some(kind) = match dep.kind {
427                                    DependencyKind::Normal => Some("dependencies"),
428                                    DependencyKind::Development => Some("dev-dependencies"),
429                                    DependencyKind::Build => Some("build-dependencies"),
430                                    _ => None,
431                                } {
432                                    // TODO: !!!! do not remove the path thing here!
433                                    manifest[kind][&dep.name]["version"] =
434                                        value(dep_version.to_string());
435                                    manifest[kind][&dep.name]
436                                        .as_inline_table_mut()
437                                        .map(toml_edit::InlineTable::fmt);
438                                    need_update = true;
439                                }
440                            }
441                        }
442
443                        p.deps
444                            .write()
445                            .unwrap()
446                            .insert(resolved.inner.name.clone(), resolved.clone());
447
448                        resolved
449                            .dependants
450                            .write()
451                            .unwrap()
452                            .insert(p.inner.name.clone(), p.clone());
453                    }
454
455                    let is_dev_dep = dep.kind == DependencyKind::Development;
456                    let is_non_local_dep = !is_dev_dep || dep.path.is_none();
457                    let is_missing_exact_version = dep_version == semver::VersionReq::STAR;
458
459                    if is_missing_exact_version && is_non_local_dep {
460                        return Err(eyre::eyre!(
461                            "{}: dependency {} has no specific version ({})",
462                            &p.inner.name,
463                            &dep.name,
464                            dep_version
465                        ).suggestion("to automatically resolve versions of local workspace members, use '--resolve-versions'"));
466                    }
467                }
468
469                // write updated cargo manifest
470                if !options.dry_run && need_update {
471                    use tokio::io::AsyncWriteExt;
472                    action::debug!("{}", &manifest.to_string());
473                    action::info!("[{}@{}] updating {}", p.inner.name, p.inner.version, p.inner.manifest_path);
474                    let mut f = tokio::fs::OpenOptions::new()
475                        .write(true)
476                        .truncate(true)
477                        .open(&p.inner.manifest_path)
478                        .await?;
479                    f.write_all(manifest.to_string().as_bytes()).await?;
480                }
481
482                Ok(())
483            }
484        })
485        .buffer_unordered(8)
486        .collect()
487        .await;
488
489    // fail on error
490    results.into_iter().collect::<eyre::Result<Vec<_>>>()?;
491    Ok(())
492}
493
494/// Publishes packages of a project on crates.io.
495///
496/// # Errors
497/// If any package cannot be published.
498pub async fn publish(options: Arc<Options>) -> eyre::Result<()> {
499    action::info!("searching cargo packages at {}", options.path.display());
500
501    let manifest_path = if options.path.is_file() {
502        options.path.clone()
503    } else {
504        options.path.join("Cargo.toml")
505    };
506    let metadata = cargo_metadata::MetadataCommand::new()
507        .manifest_path(&manifest_path)
508        .exec()?;
509
510    let packages: Arc<HashMap<PathBuf, Arc<Package>>> =
511        Arc::new(find_packages(&metadata, options.clone()).collect::<HashMap<_, _>>());
512
513    build_dag(packages.clone(), options.clone()).await?;
514
515    action::info!(
516        "found packages: {:?}",
517        packages
518            .values()
519            .map(|p| p.inner.name.clone())
520            .collect::<Vec<_>>()
521    );
522
523    if packages.is_empty() {
524        // fast path: nothing to do here
525        return Ok(());
526    }
527    let mut ready: VecDeque<Arc<Package>> =
528        packages.values().filter(|p| p.ready()).cloned().collect();
529
530    let mut tasks: FuturesUnordered<Pin<Box<TaskFut>>> = FuturesUnordered::new();
531
532    let limit = options.concurrency_limit.unwrap_or(4);
533    let limit = Arc::new(Semaphore::new(limit));
534
535    loop {
536        // check if we are done
537        if tasks.is_empty() && ready.is_empty() {
538            break;
539        }
540
541        // start running ready tasks
542        loop {
543            let Ok(permit) = limit.clone().try_acquire_owned() else {
544                break;
545            };
546            // check if we can publish
547            match ready.pop_front() {
548                Some(p) if !p.should_publish => {
549                    action::info!(
550                        "[{}@{}] skipping (publish=false)",
551                        p.inner.name,
552                        p.inner.version
553                    );
554                }
555                Some(p) => {
556                    let options_clone = options.clone();
557                    tasks.push(Box::pin(async move {
558                        let res = p.publish(options_clone).await;
559                        drop(permit);
560                        res
561                    }));
562                }
563                // no more tasks
564                None => break,
565            }
566        }
567
568        // wait for a task to complete
569        match tasks.next().await {
570            Some(Err(err)) => {
571                eyre::bail!("a task failed: {}", err)
572            }
573            Some(Ok(completed)) => {
574                // update ready tasks
575                ready.extend(
576                    completed
577                        .dependants
578                        .read()
579                        .unwrap()
580                        .values()
581                        .filter(|d| d.ready() && !d.published())
582                        .cloned(),
583                );
584            }
585            None => {}
586        }
587    }
588
589    if !packages
590        .values()
591        .all(|p| !p.should_publish || p.published())
592    {
593        eyre::bail!("not all published");
594    }
595
596    Ok(())
597}
598
599/// Classification of publishing errors.
600#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
601pub enum PublishError {
602    Unknown,
603    Retryable(http::StatusCode),
604    Fatal(http::StatusCode),
605}
606
607impl PublishError {
608    pub fn code(&self) -> Option<&http::StatusCode> {
609        match self {
610            Self::Unknown => None,
611            Self::Retryable(code) | Self::Fatal(code) => Some(code),
612        }
613    }
614}
615
616/// Classify publish errors based on the error message.
617///
618/// This approach assumes that the error messages of `cargo publish` include network errors
619/// in the form `<code> <canonical_reason>`.
620///
621/// # Returns:
622/// - `Retryable(code)` if a temporary / intermittent error was detected
623/// - `Fatal(code)` if a fatal network error was detected (such as missing permissions)
624/// - `Unknown` otherwise
625fn classify_publish_error(text: &str) -> PublishError {
626    for code_num in 100u16..=599 {
627        let Ok(code) = http::StatusCode::from_u16(code_num) else {
628            continue;
629        };
630        let Some(reason) = code.canonical_reason() else {
631            continue;
632        };
633
634        let needle = format!("{} {}", code.as_str(), reason);
635        if text.contains(&needle) {
636            if code.is_redirection()
637                || code.is_server_error()
638                || code == http::StatusCode::NOT_FOUND
639                || code == http::StatusCode::REQUEST_TIMEOUT
640                || code == http::StatusCode::CONFLICT
641                || code == http::StatusCode::GONE
642                || code == http::StatusCode::PRECONDITION_FAILED
643                || code == http::StatusCode::RANGE_NOT_SATISFIABLE
644                || code == http::StatusCode::EXPECTATION_FAILED
645                || code == http::StatusCode::MISDIRECTED_REQUEST
646                || code == http::StatusCode::UNPROCESSABLE_ENTITY
647                || code == http::StatusCode::LOCKED
648                || code == http::StatusCode::FAILED_DEPENDENCY
649                || code == http::StatusCode::TOO_EARLY
650                || code == http::StatusCode::UPGRADE_REQUIRED
651                || code == http::StatusCode::PRECONDITION_REQUIRED
652                || code == http::StatusCode::TOO_MANY_REQUESTS
653                || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
654                || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
655            {
656                return PublishError::Retryable(code);
657            } else {
658                return PublishError::Fatal(code);
659            }
660        }
661    }
662
663    PublishError::Unknown
664}
665
666#[cfg(test)]
667mod tests {
668    use similar_asserts::assert_eq as sim_assert_eq;
669
670    #[test]
671    fn classify_publish_error() {
672        sim_assert_eq!(
673            super::classify_publish_error(
674                "the remote server responded with an error (status 429 Too Many Requests): You have published too many new crates in a short period of time. Please try again after Mon, 21 Apr 2025 19:31:32 GMT or email help@crates.io to have your limit increased."
675            ),
676            super::PublishError::Retryable(http::StatusCode::TOO_MANY_REQUESTS)
677        );
678
679        sim_assert_eq!(
680            super::classify_publish_error(
681                "the remote server responded with an error (status 500 Internal Server Error): Internal Server Error"
682            ),
683            super::PublishError::Retryable(http::StatusCode::INTERNAL_SERVER_ERROR)
684        );
685
686        sim_assert_eq!(
687            super::classify_publish_error(
688                "the remote server responded with some error we don't know more about"
689            ),
690            super::PublishError::Unknown,
691        );
692    }
693}