1#![allow(clippy::missing_panics_doc)]
2
3use action_core as action;
4use cargo_metadata::DependencyKind;
5use color_eyre::{eyre, Section};
6use futures::stream::{self, FuturesUnordered, StreamExt};
7use futures::Future;
8use std::collections::{HashMap, VecDeque};
9use std::path::PathBuf;
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, RwLock};
12use tokio::sync::Semaphore;
13use tokio::time::{interval, sleep, Duration, Instant};
14
15#[derive(Debug)]
17pub struct Options {
18 pub path: PathBuf,
20
21 pub registry_token: Option<String>,
23
24 pub dry_run: bool,
27
28 pub publish_delay: Option<Duration>,
30
31 pub no_verify: bool,
33
34 pub resolve_versions: bool,
42
43 pub include: Option<Vec<String>>,
47
48 pub exclude: Option<Vec<String>>,
52}
53
54struct Package {
56 inner: cargo_metadata::Package,
57 path: PathBuf,
58 should_publish: bool,
59 published: Mutex<bool>,
60 deps: RwLock<HashMap<String, Arc<Package>>>,
61 dependants: RwLock<HashMap<String, Arc<Package>>>,
62}
63
64impl std::fmt::Debug for Package {
65 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
66 write!(f, "{self}")
67 }
68}
69
70impl std::fmt::Display for Package {
71 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
72 f.debug_struct("Package")
73 .field("name", &self.inner.name)
74 .field("version", &self.inner.version.to_string())
75 .field(
76 "deps",
77 &self.deps.read().unwrap().keys().collect::<Vec<_>>(),
78 )
79 .field(
80 "dependants",
81 &self.dependants.read().unwrap().keys().collect::<Vec<_>>(),
82 )
83 .finish()
84 }
85}
86
87impl Package {
88 pub fn published(&self) -> bool {
90 *self.published.lock().unwrap()
91 }
92
93 pub fn ready(&self) -> bool {
97 self.deps.read().unwrap().values().all(|d| d.published())
98 }
99
100 pub async fn is_available(&self) -> eyre::Result<bool> {
102 use crates_io_api::{AsyncClient, Error as RegistryError};
103 use semver::Version;
104
105 let api = AsyncClient::new(
106 "publish_crates (https://github.com/romnn/publish-crates)",
107 std::time::Duration::from_millis(1000),
108 )?;
109
110 let info = match api.get_crate(&self.inner.name).await {
111 Ok(info) => info,
112 Err(RegistryError::NotFound(_)) => return Ok(false),
113 Err(err) => return Err(err.into()),
114 };
115
116 let mut versions = info
117 .versions
118 .iter()
119 .filter_map(|v| match Version::parse(&v.num) {
120 Ok(version) => Some((version, v)),
121 Err(_) => None,
122 });
123 let Some((_, version))= versions.find(|(ver, _)| ver == &self.inner.version) else {
124 return Ok(false);
125 };
126
127 let client = reqwest::Client::new();
128 let dl_response = client
129 .head(format!("https://crates.io{}", version.dl_path))
130 .send()
131 .await?;
132 Ok(dl_response.status() == reqwest::StatusCode::OK)
133 }
134
135 pub async fn wait_package_available(
137 &self,
138 timeout: impl Into<Option<Duration>>,
139 ) -> eyre::Result<()> {
140 let timeout = timeout
141 .into()
142 .unwrap_or_else(|| Duration::from_secs(2 * 60));
143 let start = Instant::now();
144 let mut ticker = interval(Duration::from_secs(5));
145 loop {
146 ticker.tick().await;
147 action::info!(
148 "checking if {} {} is available",
149 self.inner.name,
150 self.inner.version.to_string()
151 );
152 if self.is_available().await? {
153 return Ok(());
154 }
155 if Instant::now().duration_since(start) > timeout {
157 eyre::bail!(
158 "exceeded timeout of {:?} waiting for crate {} {} to be published",
159 timeout,
160 self.inner.name,
161 self.inner.version.to_string()
162 );
163 }
164 }
165 }
166
167 pub async fn publish(self: Arc<Self>, options: Arc<Options>) -> eyre::Result<Arc<Self>> {
169 use async_process::Command;
170
171 action::info!("publishing {}", self.inner.name,);
172
173 let mut cmd = Command::new("cargo");
174 cmd.arg("publish");
175
176 if options.no_verify {
177 cmd.arg("--no-verify");
178 }
179 cmd.current_dir(&self.path);
180 if let Some(ref token) = options.registry_token {
181 cmd.env("CARGO_REGISTRY_TOKEN", token);
182 }
183 if options.dry_run {
184 cmd.arg("--dry-run");
185 if options.resolve_versions && !self.deps.read().unwrap().is_empty() {
188 action::info!(
191 "dry-run: proceed without `cargo publish --dry-run` for {} {} due to resolve version incompatibility",
192 &self.inner.name,
193 self.inner.version
194 );
195 *self.published.lock().unwrap() = true;
196 return Ok(self);
197 }
198 }
199 if options.resolve_versions {
200 cmd.arg("--allow-dirty");
202 }
203 let output = cmd.output().await?;
204 let stdout = String::from_utf8_lossy(&output.stdout);
205 let stderr = String::from_utf8_lossy(&output.stderr);
206 action::debug!("{}", &stdout);
207 action::debug!("{}", &stderr);
208
209 if !output.status.success() {
210 eyre::bail!("command {:?} failed: {}", cmd, stderr);
211 }
212
213 if options.dry_run {
214 action::info!(
215 "dry-run: skipping waiting for {} {} to be published",
216 &self.inner.name,
217 self.inner.version
218 );
219 *self.published.lock().unwrap() = true;
220 return Ok(self);
221 }
222
223 self.wait_package_available(None).await?;
225
226 sleep(
227 options
228 .publish_delay
229 .unwrap_or_else(|| Duration::from_secs(30)),
230 )
231 .await;
232
233 let mut cmd = Command::new("cargo");
234 cmd.arg("update");
235 cmd.current_dir(&self.path);
236 let output = cmd.output().await?;
237 if !output.status.success() {
238 eyre::bail!("command {:?} failed", cmd);
239 }
240
241 *self.published.lock().unwrap() = true;
242 action::info!("published {}", self.inner.name);
243
244 Ok(self)
245 }
246}
247
248type TaskFut = dyn Future<Output = eyre::Result<Arc<Package>>>;
249
250fn find_packages(
251 metadata: &cargo_metadata::Metadata,
252 options: Arc<Options>,
253) -> impl Iterator<Item = (PathBuf, Arc<Package>)> + '_ {
254 let packages = metadata.workspace_packages();
255 packages.into_iter().filter_map(move |package| {
256 let should_publish = package.publish.as_ref().map_or(true, |p| !p.is_empty());
257
258 let is_included = options
259 .include
260 .as_ref()
261 .map_or(true, |inc| inc.is_empty() || inc.contains(&package.name));
262
263 let is_excluded = options
264 .exclude
265 .as_ref()
266 .is_some_and(|excl| excl.contains(&package.name));
267
268 let should_publish = should_publish && is_included && !is_excluded;
269
270 let path: PathBuf = package.manifest_path.parent()?.into();
271 Some((
272 path.clone(),
273 Arc::new(Package {
274 inner: package.clone(),
275 path,
276 should_publish,
277 published: Mutex::new(false),
278 deps: RwLock::new(HashMap::new()),
279 dependants: RwLock::new(HashMap::new()),
280 }),
281 ))
282 })
283}
284
285async fn build_dag(
286 packages: Arc<HashMap<PathBuf, Arc<Package>>>,
287 options: Arc<Options>,
288) -> eyre::Result<()> {
289 let packages_iter = packages.values().filter(|p| p.should_publish);
290 let results: Vec<_> = stream::iter(packages_iter)
291 .map(|p| {
292 let packages = packages.clone();
293 let options = options.clone();
294 async move {
295 use toml_edit::{value, Document};
296 let manifest_path = &p.inner.manifest_path;
297 let manifest = tokio::fs::read_to_string(manifest_path).await?;
298 let mut manifest = manifest.parse::<Document>()?;
299 let mut need_update = false;
300
301 for dep in &p.inner.dependencies {
302 let mut dep_version = dep.req.clone();
303 if let Some(path) = dep.path.as_ref().map(PathBuf::from) {
304 let resolved = packages.get(&path).ok_or(eyre::eyre!(
308 "{}: could not resolve local dependency {}",
309 &p.inner.name,
310 path.display()
311 ))?;
312
313 if !resolved.should_publish {
317 eyre::bail!(
318 "{}: cannot publish because dependency {} will not be published",
319 &p.inner.name,
320 &dep.name,
321 );
322 }
323
324 if options.resolve_versions {
325 dep_version = semver::VersionReq {
327 comparators: vec![semver::Comparator {
328 op: semver::Op::Exact,
329 major: resolved.inner.version.major,
330 minor: Some(resolved.inner.version.minor),
331 patch: Some(resolved.inner.version.patch),
332 pre: semver::Prerelease::EMPTY,
333 }],
334 };
335
336 let changed = dep_version != dep.req;
337 if changed {
338 if let Some(kind) = match dep.kind {
340 DependencyKind::Normal => Some("dependencies"),
341 DependencyKind::Development => Some("dev-dependencies"),
342 DependencyKind::Build => Some("build-dependencies"),
343 _ => None,
344 } {
345 manifest[kind][&dep.name]["version"] =
347 value(dep_version.to_string());
348 manifest[kind][&dep.name]
349 .as_inline_table_mut()
350 .map(toml_edit::InlineTable::fmt);
351 need_update = true;
352 }
353 }
354 }
355
356 p.deps
357 .write()
358 .unwrap()
359 .insert(resolved.inner.name.clone(), resolved.clone());
360
361 resolved
362 .dependants
363 .write()
364 .unwrap()
365 .insert(p.inner.name.clone(), p.clone());
366 }
367
368 if dep_version == semver::VersionReq::STAR
369 && (dep.kind != DependencyKind::Development || dep.path.is_none())
370 {
371 return Err(eyre::eyre!(
372 "{}: dependency {} is has no specific version ({})",
373 &p.inner.name,
374 &dep.name,
375 dep_version
376 ).suggestion("to automatically resolve versions of local workspace members, use '--resolve-versions'"));
377 }
378 }
379
380 if !options.dry_run && need_update {
382 use tokio::io::AsyncWriteExt;
383 action::debug!("{}", &manifest.to_string());
384 action::warning!("{}: updating {}", &p.inner.name, &p.inner.manifest_path);
385 let mut f = tokio::fs::OpenOptions::new()
386 .write(true)
387 .truncate(true)
388 .open(&p.inner.manifest_path)
389 .await?;
390 f.write_all(manifest.to_string().as_bytes()).await?;
391 }
392
393 Ok(())
394 }
395 })
396 .buffer_unordered(8)
397 .collect()
398 .await;
399
400 results.into_iter().collect::<eyre::Result<Vec<_>>>()?;
402 Ok(())
403}
404
405pub async fn publish(options: Arc<Options>) -> eyre::Result<()> {
410 action::info!("searching cargo packages at {}", options.path.display());
411
412 let manifest_path = if options.path.is_file() {
413 options.path.clone()
414 } else {
415 options.path.join("Cargo.toml")
416 };
417 let metadata = cargo_metadata::MetadataCommand::new()
418 .manifest_path(&manifest_path)
419 .exec()?;
420
421 let packages: Arc<HashMap<PathBuf, Arc<Package>>> =
422 Arc::new(find_packages(&metadata, options.clone()).collect::<HashMap<_, _>>());
423
424 build_dag(packages.clone(), options.clone()).await?;
425
426 action::info!(
427 "found packages: {:?}",
428 packages
429 .values()
430 .map(|p| p.inner.name.clone())
431 .collect::<Vec<_>>()
432 );
433
434 if packages.is_empty() {
435 return Ok(());
437 }
438 let mut ready: VecDeque<Arc<Package>> =
439 packages.values().filter(|p| p.ready()).cloned().collect();
440
441 let mut tasks: FuturesUnordered<Pin<Box<TaskFut>>> = FuturesUnordered::new();
442 let limit = Arc::new(Semaphore::new(5));
443
444 loop {
445 if tasks.is_empty() && ready.is_empty() {
447 break;
448 }
449
450 loop {
452 let Ok(permit) = limit.clone().try_acquire_owned() else {
453 break;
454 };
455 match ready.pop_front() {
457 Some(p) if !p.should_publish => {
458 action::info!("skipping: {} (publish=false)", p.inner.name);
459 }
460 Some(p) => {
461 let options_clone = options.clone();
462 tasks.push(Box::pin(async move {
463 let res = p.publish(options_clone).await;
464 drop(permit);
465 res
466 }));
467 }
468 None => break,
470 }
471 }
472
473 match tasks.next().await {
475 Some(Err(err)) => {
476 eyre::bail!("a task failed: {}", err)
477 }
478 Some(Ok(completed)) => {
479 ready.extend(
481 completed
482 .dependants
483 .read()
484 .unwrap()
485 .values()
486 .filter(|d| d.ready() && !d.published())
487 .cloned(),
488 );
489 }
490 None => {}
491 }
492 }
493
494 if !packages
495 .values()
496 .all(|p| !p.should_publish || p.published())
497 {
498 eyre::bail!("not all published");
499 }
500
501 Ok(())
502}