1use action_core as action;
2use cargo_metadata::DependencyKind;
3use color_eyre::{Section, eyre};
4use futures::Future;
5use futures::stream::{self, FuturesUnordered, StreamExt};
6use std::collections::{HashMap, VecDeque};
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex, RwLock};
10use tokio::sync::Semaphore;
11use tokio::time::{Duration, Instant, interval, sleep};
12
13const DATETIME_FORMAT: &'static [time::format_description::BorrowedFormatItem<'static>] =
14 time::macros::format_description!("[hour]:[minute]:[second]");
15
16#[derive(Debug)]
18pub struct Options {
19 pub path: PathBuf,
21
22 pub registry_token: Option<String>,
24
25 pub dry_run: bool,
28
29 pub publish_delay: Option<Duration>,
31
32 pub no_verify: bool,
34
35 pub resolve_versions: bool,
43
44 pub include: Option<Vec<String>>,
48
49 pub exclude: Option<Vec<String>>,
53
54 pub max_retries: Option<usize>,
60
61 pub concurrency_limit: Option<usize>,
63}
64
65struct Package {
67 inner: cargo_metadata::Package,
68 path: PathBuf,
69 should_publish: bool,
70 published: Mutex<bool>,
71 deps: RwLock<HashMap<String, Arc<Package>>>,
72 dependants: RwLock<HashMap<String, Arc<Package>>>,
73}
74
75impl std::fmt::Debug for Package {
76 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
77 write!(f, "{self}")
78 }
79}
80
81impl std::fmt::Display for Package {
82 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
83 f.debug_struct("Package")
84 .field("name", &self.inner.name)
85 .field("version", &self.inner.version.to_string())
86 .field(
87 "deps",
88 &self.deps.read().unwrap().keys().collect::<Vec<_>>(),
89 )
90 .field(
91 "dependants",
92 &self.dependants.read().unwrap().keys().collect::<Vec<_>>(),
93 )
94 .finish()
95 }
96}
97
98impl Package {
99 pub fn published(&self) -> bool {
101 *self.published.lock().unwrap()
102 }
103
104 pub fn ready(&self) -> bool {
108 self.deps.read().unwrap().values().all(|d| d.published())
109 }
110
111 pub async fn is_available(&self) -> eyre::Result<bool> {
113 use crates_io_api::{AsyncClient, Error as RegistryError};
114 use semver::Version;
115
116 let api = AsyncClient::new(
117 "publish_crates (https://github.com/romnn/publish-crates)",
118 std::time::Duration::from_millis(1000),
119 )?;
120
121 let info = match api.get_crate(&self.inner.name).await {
122 Ok(info) => info,
123 Err(RegistryError::NotFound(_)) => return Ok(false),
124 Err(err) => return Err(err.into()),
125 };
126
127 let mut versions = info
128 .versions
129 .iter()
130 .filter_map(|v| match Version::parse(&v.num) {
131 Ok(version) => Some((version, v)),
132 Err(_) => None,
133 });
134 let Some((_, version)) = versions.find(|(ver, _)| ver == &self.inner.version) else {
135 return Ok(false);
136 };
137
138 let client = reqwest::Client::new();
139 let dl_response = client
140 .head(format!("https://crates.io{}", version.dl_path))
141 .send()
142 .await?;
143 Ok(dl_response.status() == reqwest::StatusCode::OK)
144 }
145
146 pub async fn wait_package_available(
148 &self,
149 timeout: impl Into<Option<Duration>>,
150 ) -> eyre::Result<()> {
151 let timeout = timeout
152 .into()
153 .unwrap_or_else(|| Duration::from_secs(2 * 60));
154 let start = Instant::now();
155 let mut ticker = interval(Duration::from_secs(5));
156 loop {
157 ticker.tick().await;
158 action::info!(
159 "[{}@{}] checking if available",
160 self.inner.name,
161 self.inner.version,
162 );
163 if self.is_available().await? {
164 return Ok(());
165 }
166 if Instant::now().duration_since(start) > timeout {
168 eyre::bail!(
169 "exceeded timeout of {:?} waiting for crate {} {} to be published",
170 timeout,
171 self.inner.name,
172 self.inner.version.to_string()
173 );
174 }
175 }
176 }
177
178 pub async fn publish(self: Arc<Self>, options: Arc<Options>) -> eyre::Result<Arc<Self>> {
180 use async_process::Command;
181
182 action::info!("[{}@{}] publishing", self.inner.name, self.inner.version);
183
184 let mut cmd = Command::new("cargo");
185 cmd.arg("publish");
186
187 if options.no_verify {
188 cmd.arg("--no-verify");
189 }
190 cmd.current_dir(&self.path);
191 if let Some(ref token) = options.registry_token {
192 cmd.env("CARGO_REGISTRY_TOKEN", token);
193 }
194 if options.dry_run {
195 cmd.arg("--dry-run");
196 if options.resolve_versions && !self.deps.read().unwrap().is_empty() {
199 action::info!(
202 "[{}@{}] dry-run: proceed without `cargo publish --dry-run` due to resolve version incompatibility",
203 self.inner.name,
204 self.inner.version
205 );
206 *self.published.lock().unwrap() = true;
207 return Ok(self);
208 }
209 }
210 if options.resolve_versions {
211 cmd.arg("--allow-dirty");
213 }
214
215 let max_retries = options.max_retries.unwrap_or(10);
216 let mut attempt = 0;
217
218 loop {
219 attempt += 1;
220
221 if attempt > 1 {
222 action::warning!(
223 "[{}@{}] publishing (attempt {}/{})",
224 self.inner.name,
225 self.inner.version,
226 attempt,
227 max_retries
228 );
229 }
230
231 let output = cmd.output().await?;
232 let stdout = String::from_utf8_lossy(&output.stdout);
233 let stderr = String::from_utf8_lossy(&output.stderr);
234 action::debug!("{}", &stdout);
235 action::debug!("{}", &stderr);
236
237 if !output.status.success() {
238 action::warning!("{}", &stdout);
239 action::warning!("{}", &stderr);
240
241 if stderr.contains("already exists on crates.io index") {
242 break;
243 }
244
245 let error = classify_publish_error(&stderr);
246 let wait_duration = match error {
247 PublishError::Fatal(_) => {
248 eyre::bail!("command {:?} failed: {}", cmd, stderr);
249 }
250 PublishError::Retryable(code) => {
251 action::warning!(
252 "[{}@{}] intermittent failure: {} {}",
253 self.inner.name,
254 self.inner.version,
255 code.as_u16(),
256 code.canonical_reason().unwrap_or_default(),
257 );
258 match code {
259 http::StatusCode::TOO_MANY_REQUESTS => {
260 std::time::Duration::from_secs(10 * 60)
262 }
263 _ => std::time::Duration::from_secs(5 * 60),
265 }
266 }
267 PublishError::Unknown => {
268 action::warning!(
269 "[{}@{}] unknown failure",
270 self.inner.name,
271 self.inner.version,
272 );
273 std::time::Duration::from_secs(5 * 60)
275 }
276 };
277
278 if attempt >= max_retries {
279 eyre::bail!("command {:?} failed: {}", cmd, stderr);
280 }
281
282 let next_attempt = std::time::SystemTime::now() + wait_duration;
283 action::warning!(
284 "[{}@{}] attempting again in {wait_duration:?} at {}",
285 self.inner.name,
286 self.inner.version,
287 time::OffsetDateTime::from(next_attempt)
288 .format(DATETIME_FORMAT)
289 .unwrap_or_else(|_| humantime::format_rfc3339(next_attempt).to_string())
290 );
291 sleep(wait_duration).await;
292 } else {
293 break;
294 }
295 }
296
297 if options.dry_run {
298 action::info!(
299 "[{}@{}] dry-run: skip waiting for successful publish",
300 &self.inner.name,
301 self.inner.version
302 );
303 *self.published.lock().unwrap() = true;
304 return Ok(self);
305 }
306
307 self.wait_package_available(None).await?;
309
310 sleep(
311 options
312 .publish_delay
313 .unwrap_or_else(|| Duration::from_secs(30)),
314 )
315 .await;
316
317 let mut cmd = Command::new("cargo");
318 cmd.arg("update");
319 cmd.current_dir(&self.path);
320 let output = cmd.output().await?;
321 if !output.status.success() {
322 eyre::bail!("command {:?} failed", cmd);
323 }
324
325 *self.published.lock().unwrap() = true;
326 action::info!(
327 "[{}@{}] published successfully",
328 self.inner.name,
329 self.inner.version
330 );
331
332 Ok(self)
333 }
334}
335
336type TaskFut = dyn Future<Output = eyre::Result<Arc<Package>>>;
337
338fn find_packages<'a>(
339 metadata: &cargo_metadata::Metadata,
340 options: &'a Options,
341) -> impl Iterator<Item = (PathBuf, Arc<Package>)> {
342 let packages = metadata.workspace_packages();
343 packages.into_iter().filter_map(move |package| {
344 let should_publish = package.publish.as_ref().is_none_or(|p| !p.is_empty());
345
346 let is_included = options
347 .include
348 .as_ref()
349 .is_none_or(|inc| inc.is_empty() || inc.contains(&package.name));
350
351 let is_excluded = options
352 .exclude
353 .as_ref()
354 .is_some_and(|excl| excl.contains(&package.name));
355
356 let should_publish = should_publish && is_included && !is_excluded;
357
358 let path: PathBuf = package.manifest_path.parent()?.into();
359 Some((
360 path.clone(),
361 Arc::new(Package {
362 inner: package.clone(),
363 path,
364 should_publish,
365 published: Mutex::new(false),
366 deps: RwLock::new(HashMap::new()),
367 dependants: RwLock::new(HashMap::new()),
368 }),
369 ))
370 })
371}
372
373async fn build_dag(
374 packages: &HashMap<PathBuf, Arc<Package>>,
375 options: &Options,
376) -> eyre::Result<()> {
377 let packages_iter = packages.values().filter(|p| p.should_publish);
378 let results: Vec<_> = stream::iter(packages_iter)
379 .map(|p| async move {
380 use toml_edit::{value, DocumentMut};
381 let manifest_path = &p.inner.manifest_path;
382 let manifest = tokio::fs::read_to_string(manifest_path).await?;
383 let mut manifest = manifest.parse::<DocumentMut>()?;
384 let mut need_update = false;
385
386 for dep in &p.inner.dependencies {
387 let mut dep_version = dep.req.clone();
388 if let Some(path) = dep.path.as_ref().map(PathBuf::from) {
389 let resolved = packages.get(&path).ok_or(eyre::eyre!(
393 "{}: could not resolve local dependency {}",
394 &p.inner.name,
395 path.display()
396 ))?;
397
398 if !resolved.should_publish {
402 eyre::bail!(
403 "{}: cannot publish because dependency {} will not be published",
404 &p.inner.name,
405 &dep.name,
406 );
407 }
408
409 if options.resolve_versions {
410 dep_version = semver::VersionReq {
412 comparators: vec![semver::Comparator {
413 op: semver::Op::Exact,
414 major: resolved.inner.version.major,
415 minor: Some(resolved.inner.version.minor),
416 patch: Some(resolved.inner.version.patch),
417 pre: semver::Prerelease::EMPTY,
418 }],
419 };
420
421 let changed = dep_version != dep.req;
422 if changed {
423 let section = match dep.kind {
425 DependencyKind::Normal => Some("dependencies"),
426 DependencyKind::Development => Some("dev-dependencies"),
427 DependencyKind::Build => Some("build-dependencies"),
428 _ => None,
429 };
430 if let Some(section) = section {
431 manifest[section][&dep.name]["version"] =
432 value(dep_version.to_string());
433 manifest[section][&dep.name]
434 .as_inline_table_mut()
435 .map(toml_edit::InlineTable::fmt);
436 need_update = true;
437 }
438 }
439 }
440
441 p.deps
442 .write()
443 .unwrap()
444 .insert(resolved.inner.name.clone(), resolved.clone());
445
446 resolved
447 .dependants
448 .write()
449 .unwrap()
450 .insert(p.inner.name.clone(), p.clone());
451 }
452
453 let is_dev_dep = dep.kind == DependencyKind::Development;
454 let is_non_local_dep = !is_dev_dep || dep.path.is_none();
455 let is_missing_exact_version = dep_version == semver::VersionReq::STAR;
456
457 if is_missing_exact_version && is_non_local_dep {
458 return Err(eyre::eyre!(
459 "{}: dependency {} has no specific version ({})",
460 &p.inner.name,
461 &dep.name,
462 dep_version
463 ).suggestion("to automatically resolve versions of local workspace members, use '--resolve-versions'"));
464 }
465 }
466
467 if !options.dry_run && need_update {
469 use tokio::io::AsyncWriteExt;
470 action::debug!("{}", &manifest.to_string());
471 action::info!("[{}@{}] updating {}", p.inner.name, p.inner.version, p.inner.manifest_path);
472 let mut f = tokio::fs::OpenOptions::new()
473 .write(true)
474 .truncate(true)
475 .open(&p.inner.manifest_path)
476 .await?;
477 f.write_all(manifest.to_string().as_bytes()).await?;
478 }
479
480 Ok(())
481 })
482 .buffer_unordered(8)
483 .collect()
484 .await;
485
486 results.into_iter().collect::<eyre::Result<Vec<_>>>()?;
488 Ok(())
489}
490
491pub async fn publish(mut options: Options) -> eyre::Result<()> {
496 action::info!("searching cargo packages at {}", options.path.display());
497
498 let manifest_path = if options.path.is_file() {
499 options.path.clone()
500 } else {
501 options.path.join("Cargo.toml")
502 };
503 let metadata = cargo_metadata::MetadataCommand::new()
504 .manifest_path(&manifest_path)
505 .exec()?;
506
507 let packages: HashMap<PathBuf, Arc<Package>> = find_packages(&metadata, &options).collect();
508 build_dag(&packages, &options).await?;
509
510 action::info!(
511 "found packages: {:?}",
512 packages
513 .values()
514 .map(|p| p.inner.name.clone())
515 .collect::<Vec<_>>()
516 );
517
518 options.max_retries = Some(options.max_retries.unwrap_or(2 * packages.len()));
519 let options = Arc::new(options);
520
521 if packages.is_empty() {
522 return Ok(());
524 }
525 let mut ready: VecDeque<Arc<Package>> =
526 packages.values().filter(|p| p.ready()).cloned().collect();
527
528 let mut tasks: FuturesUnordered<Pin<Box<TaskFut>>> = FuturesUnordered::new();
529
530 let limit = options.concurrency_limit.unwrap_or(4);
531 let limit = Arc::new(Semaphore::new(limit));
532
533 loop {
534 if tasks.is_empty() && ready.is_empty() {
536 break;
537 }
538
539 loop {
541 let Ok(permit) = limit.clone().try_acquire_owned() else {
543 break;
544 };
545 match ready.pop_front() {
547 Some(p) if !p.should_publish => {
548 action::info!(
549 "[{}@{}] skipping (publish=false)",
550 p.inner.name,
551 p.inner.version
552 );
553 }
554 Some(p) => {
555 tasks.push({
556 let options = Arc::clone(&options);
557 Box::pin(async move {
558 let res = p.publish(options).await;
559
560 drop(permit);
562 res
563 })
564 });
565 }
566 None => break,
568 }
569 }
570
571 match tasks.next().await {
573 Some(Err(err)) => {
574 eyre::bail!("a task failed: {}", err)
575 }
576 Some(Ok(completed)) => {
577 ready.extend(
579 completed
580 .dependants
581 .read()
582 .unwrap()
583 .values()
584 .filter(|d| d.ready() && !d.published())
585 .cloned(),
586 );
587 }
588 None => {}
589 }
590 }
591
592 if !packages
593 .values()
594 .all(|p| !p.should_publish || p.published())
595 {
596 eyre::bail!("not all published");
597 }
598
599 Ok(())
600}
601
602#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
604pub enum PublishError {
605 Unknown,
606 Retryable(http::StatusCode),
607 Fatal(http::StatusCode),
608}
609
610impl PublishError {
611 pub fn code(&self) -> Option<&http::StatusCode> {
612 match self {
613 Self::Unknown => None,
614 Self::Retryable(code) | Self::Fatal(code) => Some(code),
615 }
616 }
617}
618
619fn classify_publish_error(text: &str) -> PublishError {
629 for code_num in 100u16..=599 {
630 let Ok(code) = http::StatusCode::from_u16(code_num) else {
631 continue;
632 };
633 let Some(reason) = code.canonical_reason() else {
634 continue;
635 };
636
637 let needle = format!("{} {}", code.as_str(), reason);
638 if text.contains(&needle) {
639 if code.is_redirection()
640 || code.is_server_error()
641 || code == http::StatusCode::NOT_FOUND
642 || code == http::StatusCode::REQUEST_TIMEOUT
643 || code == http::StatusCode::CONFLICT
644 || code == http::StatusCode::GONE
645 || code == http::StatusCode::PRECONDITION_FAILED
646 || code == http::StatusCode::RANGE_NOT_SATISFIABLE
647 || code == http::StatusCode::EXPECTATION_FAILED
648 || code == http::StatusCode::MISDIRECTED_REQUEST
649 || code == http::StatusCode::UNPROCESSABLE_ENTITY
650 || code == http::StatusCode::LOCKED
651 || code == http::StatusCode::FAILED_DEPENDENCY
652 || code == http::StatusCode::TOO_EARLY
653 || code == http::StatusCode::UPGRADE_REQUIRED
654 || code == http::StatusCode::PRECONDITION_REQUIRED
655 || code == http::StatusCode::TOO_MANY_REQUESTS
656 || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
657 || code == http::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS
658 {
659 return PublishError::Retryable(code);
660 } else {
661 return PublishError::Fatal(code);
662 }
663 }
664 }
665
666 PublishError::Unknown
667}
668
669#[cfg(test)]
670mod tests {
671 use similar_asserts::assert_eq as sim_assert_eq;
672
673 #[test]
674 fn classify_publish_error() {
675 sim_assert_eq!(
676 super::classify_publish_error(
677 "the remote server responded with an error (status 429 Too Many Requests): You have published too many new crates in a short period of time. Please try again after Mon, 21 Apr 2025 19:31:32 GMT or email help@crates.io to have your limit increased."
678 ),
679 super::PublishError::Retryable(http::StatusCode::TOO_MANY_REQUESTS)
680 );
681
682 sim_assert_eq!(
683 super::classify_publish_error(
684 "the remote server responded with an error (status 500 Internal Server Error): Internal Server Error"
685 ),
686 super::PublishError::Retryable(http::StatusCode::INTERNAL_SERVER_ERROR)
687 );
688
689 sim_assert_eq!(
690 super::classify_publish_error(
691 "the remote server responded with some error we don't know more about"
692 ),
693 super::PublishError::Unknown,
694 );
695 }
696}