1use anyhow::anyhow;
8use async_compression::futures::{bufread::ZstdDecoder, write::ZstdEncoder};
9use async_tar::EntryType;
10use async_trait::async_trait;
11use async_zip::base::read;
12use core::pin::{pin, Pin};
13use futures::{
14 io::{
15 AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Cursor,
16 },
17 StreamExt,
18};
19use log::*;
20use octocrab::{
21 models::{repos::Release, AssetId, ReleaseId},
22 Octocrab,
23};
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeMap, BTreeSet};
26use std::io;
27use std::os::unix::fs::PermissionsExt;
28use std::path::{Component, Path, PathBuf};
29use std::time::{Duration, SystemTime};
30use tar::Archive;
31use tokio::fs;
32use tokio::process::Command;
33use tokio_util::compat::TokioAsyncReadCompatExt;
34
35pub mod header;
36
37#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum ImportFormat {
41 Zip,
42 Zstd,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
51pub struct ImportPayload {
52 pub url: String,
53 pub authorization: Option<String>,
54 pub format: ImportFormat,
55}
56
57#[derive(Clone, Debug, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum ImportStatus {
61 Done,
63 Failed { reason: Option<String> },
65 Importing { status: Option<String> },
67}
68
69#[derive(Clone, Debug, Default, Serialize, Deserialize)]
71pub struct ImportCapabilities {
72 pub info: Vec<String>,
73 pub extensions: Vec<String>,
74}
75
76pub struct Error(anyhow::Error);
81
82impl core::fmt::Debug for Error {
83 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
84 self.0.fmt(f)
85 }
86}
87
88impl core::fmt::Display for Error {
89 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
90 self.0.fmt(f)
91 }
92}
93
94impl<T: Into<anyhow::Error>> From<T> for Error {
95 fn from(e: T) -> Self {
96 Self(e.into())
97 }
98}
99
100impl axum::response::IntoResponse for Error {
101 fn into_response(self) -> axum::response::Response {
102 (
103 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
104 format!("{:#?}", self.0.to_string()),
105 )
106 .into_response()
107 }
108}
109
110pub type Result<T> = core::result::Result<T, Error>;
111
112#[async_trait]
113pub trait AsyncUnpack {
114 async fn unpack<'a>(
116 &'a mut self,
117 path: &'a Path,
118 in_subpath: bool,
119 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
120 ) -> io::Result<()>;
121
122 async fn unpack_exec<'a>(
127 &'a mut self,
128 path: &'a Path,
129 in_subpath: bool,
130 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
131 ) -> io::Result<()>;
132}
133
134#[async_trait]
135impl<
136 T: for<'a> Fn(&'a Path, bool, Pin<&'a mut (dyn AsyncRead + Send + 'a)>, bool) -> F + Send,
137 F: core::future::Future<Output = io::Result<()>> + Send,
138 > AsyncUnpack for T
139{
140 async fn unpack<'a>(
141 &'a mut self,
142 path: &'a Path,
143 in_subpath: bool,
144 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
145 ) -> io::Result<()> {
146 let f = (*self)(path, in_subpath, stream, false);
147 f.await
148 }
149
150 async fn unpack_exec<'a>(
151 &'a mut self,
152 path: &'a Path,
153 in_subpath: bool,
154 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
155 ) -> io::Result<()> {
156 let f = (*self)(path, in_subpath, stream, true);
157 f.await
158 }
159}
160
161pub async fn find_files_by_suffix(
162 directory: impl AsRef<Path>,
163 suffix: &str,
164) -> io::Result<Vec<fs::DirEntry>> {
165 let mut entries = vec![];
166 if let Ok(mut read_dir) = fs::read_dir(directory).await {
167 while let Ok(Some(entry)) = read_dir.next_entry().await {
168 let filetype = entry.file_type().await?;
169 if filetype.is_dir() {
170 continue;
171 }
172 if let Some(true) = entry.path().to_str().map(|s| s.ends_with(suffix)) {
173 entries.push(entry)
174 }
175 }
176 }
177 Ok(entries)
178}
179
180#[derive(Clone, Debug, Serialize, Deserialize)]
182#[serde(default)]
183pub struct PostUnpackCfg {
184 prep: bool,
186 verify_cram: bool,
188 verify_events: bool,
190 pack: bool,
192 meld: bool,
194 chop: bool,
196}
197
198impl Default for PostUnpackCfg {
199 fn default() -> Self {
200 Self {
201 prep: true,
202 verify_cram: false,
203 verify_events: true,
204 pack: true,
205 meld: true,
206 chop: true,
207 }
208 }
209}
210
211impl PostUnpackCfg {
212 pub fn verify_cram(self, verify_cram: bool) -> Self {
213 Self {
214 verify_cram,
215 ..self
216 }
217 }
218
219 pub fn verify_events(self, verify_events: bool) -> Self {
220 Self {
221 verify_events,
222 ..self
223 }
224 }
225
226 pub fn prep(self, prep: bool) -> Self {
227 Self { prep, ..self }
228 }
229
230 pub fn pack(self, pack: bool) -> Self {
231 Self { pack, ..self }
232 }
233
234 pub fn meld(self, meld: bool) -> Self {
235 Self { meld, ..self }
236 }
237
238 pub fn chop(self, chop: bool) -> Self {
239 Self { chop, ..self }
240 }
241
242 pub fn all() -> Self {
243 Self {
244 verify_cram: true,
245 verify_events: true,
246 prep: true,
247 pack: true,
248 meld: true,
249 chop: true,
250 }
251 }
252}
253
254#[derive(Debug, PartialEq, Eq, Clone, Copy)]
255struct EventCount {
256 snapshot: u64,
257 disk: u64,
258}
259
260impl EventCount {
261 fn from_info_stderr(info: &str) -> anyhow::Result<Self> {
262 let mut lines = info.lines();
263
264 while let Some(line) = lines.next() {
265 let line = line.trim();
266
267 let Some(line) = line.strip_prefix("urbit: ") else {
269 continue;
270 };
271 let Some((_, line)) = line.split_once("at event ") else {
272 continue;
273 };
274 let Ok(snapshot) = line.parse::<u64>() else {
275 continue;
276 };
277
278 trace!("Parsed snapshot event number: {snapshot}");
279
280 let Some(line) = lines.next() else { continue };
281 let line = line.trim();
282
283 if !line.contains("disk:") {
285 continue;
286 }
287 let Some((_, line)) = line.split_once("event=") else {
288 continue;
289 };
290 let Ok(disk) = line.parse::<u64>() else {
291 continue;
292 };
293
294 return Ok(Self { snapshot, disk });
295 }
296
297 Err(anyhow!("Could not parse info output"))
298 }
299}
300
301#[derive(Clone)]
302pub struct StandardUnpack<T: AsRef<Path>> {
303 path: T,
304 loom: Option<usize>,
305 event_count: Option<EventCount>,
306}
307
308impl<T: AsRef<Path>> StandardUnpack<T> {
309 pub fn loom(&self) -> Option<usize> {
310 self.loom
311 }
312}
313
314impl<T: AsRef<Path>> Drop for StandardUnpack<T> {
315 fn drop(&mut self) {
316 let path = self.path.as_ref();
317 trace!("Drop {path:?}");
318 if path.exists() {
319 if let Err(e) = std::fs::remove_dir_all(path) {
320 error!("StandardUnpack: unable to remove dir ({e:?})");
321 }
322 }
323 }
324}
325
326impl<T: AsRef<Path>> core::ops::Deref for StandardUnpack<T> {
327 type Target = Path;
328
329 fn deref(&self) -> &Self::Target {
330 self.path.as_ref()
331 }
332}
333
334impl<T: AsRef<Path>> StandardUnpack<T> {
335 pub async fn new(path: T, loom: Option<usize>) -> Result<StandardUnpack<T>> {
336 if path.as_ref().exists() {
337 info!("Remove {:?}", path.as_ref());
338 fs::remove_dir_all(path.as_ref()).await?;
339 }
340
341 fs::create_dir_all(path.as_ref()).await?;
342
343 Ok(Self {
344 path,
345 loom,
346 event_count: None,
347 })
348 }
349
350 pub async fn detect_loom(&mut self) -> Result<Option<usize>> {
351 Ok(None)
353 }
354
355 pub fn set_loom(&mut self, loom: Option<usize>) {
356 self.loom = loom;
357 }
358
359 async fn run_cmd(&mut self, args: &[&str]) -> Result<String> {
360 let mut cmd = Command::new("./.run");
361
362 cmd.current_dir(&**self).args(args);
363
364 if let Some(loom) = self.loom {
365 cmd.args(["--loom", &loom.to_string(), "-t"]);
366 }
367
368 let output = cmd.output().await?;
369
370 trace!("{:?}", std::str::from_utf8(&output.stdout));
371
372 if !output.status.success() {
373 Err(anyhow!(
374 "Command failed: {} {:?}",
375 output.status,
376 std::str::from_utf8(&output.stderr)
377 )
378 .into())
379 } else {
380 Ok(String::from_utf8_lossy(&output.stderr).into())
381 }
382 }
383
384 pub async fn store_events(mut self) -> Result<StandardUnpack<T>> {
385 debug!("Pre-work event count");
386 let err = self.run_cmd(&["-R"]).await?;
387 self.event_count = Some(EventCount::from_info_stderr(&err)?);
388 Ok(self)
389 }
390
391 pub async fn verify_events(mut self) -> Result<StandardUnpack<T>> {
392 debug!("Pre-work event count");
393 let err = self.run_cmd(&["-R"]).await?;
394 let event_count = EventCount::from_info_stderr(&err)?;
395 let Some(events) = self.event_count else {
396 return Err(anyhow!(
397 "verify_events called without previous store_events"
398 ).into());
399 };
400 if event_count == events {
401 Ok(self)
402 } else {
403 Err(anyhow!(
404 "Event count mismatch between prev={events:?} and cur={event_count:?}"
405 ).into())
406 }
407 }
408
409 pub async fn cram(mut self) -> Result<StandardUnpack<T>> {
410 debug!("Pre-work cram");
411 self.run_cmd(&["cram"]).await?;
412 Ok(self)
413 }
414
415 pub async fn verify_cram(mut self) -> Result<StandardUnpack<T>> {
416 debug!("Post-work");
417
418 let roc_dir = self.join(".urb/roc");
420 let entries = find_files_by_suffix(&roc_dir, ".jam").await?;
421
422 let [entry] = &entries[..] else {
423 return Err(anyhow!("Invalid number of jams").into());
424 };
425
426 let hash = sha256::async_digest::try_async_digest(entry.path()).await?;
427 fs::remove_file(entry.path()).await?;
428
429 debug!("Pre-work hash: {hash}");
430
431 self.run_cmd(&["cram"]).await?;
434
435 let hash2 = sha256::async_digest::try_async_digest(entry.path()).await?;
436 fs::remove_dir_all(roc_dir).await?;
437
438 debug!("Post-work hash: {hash}");
439
440 if hash == hash2 {
441 Ok(self)
442 } else {
443 Err(anyhow!("Pre and post work jam mismatch").into())
444 }
445 }
446
447 pub async fn prep(mut self) -> Result<StandardUnpack<T>> {
448 debug!("Prep");
449 self.run_cmd(&["prep"]).await?;
450 Ok(self)
451 }
452
453 pub async fn pack(mut self) -> Result<StandardUnpack<T>> {
454 debug!("Pack");
455 self.run_cmd(&["pack"]).await?;
456 Ok(self)
457 }
458
459 pub async fn meld(mut self) -> Result<StandardUnpack<T>> {
460 debug!("Meld");
461 self.run_cmd(&["meld"]).await?;
462 Ok(self)
463 }
464
465 pub async fn chop(mut self) -> Result<StandardUnpack<T>> {
466 debug!("Chop");
467 self.run_cmd(&["chop"]).await?;
468
469 let chop_dir = self.join(".urb/log/chop");
471 if chop_dir.exists() {
472 fs::remove_dir_all(chop_dir).await?;
473 }
474
475 let log_dir = self.join(".urb/log");
478 let mut max_epoch = None;
479 if let Ok(mut read_dir) = fs::read_dir(&log_dir).await {
480 while let Ok(Some(entry)) = read_dir.next_entry().await {
481 let filetype = entry.file_type().await?;
482 if !filetype.is_dir() {
483 continue;
484 }
485 let fname = entry.file_name();
486 let Some(fname) = fname.to_str() else {
487 continue;
488 };
489
490 let Some(epoch) = fname
491 .strip_prefix("0i")
492 .and_then(|v| v.parse::<usize>().ok())
493 else {
494 continue;
495 };
496
497 match max_epoch {
498 None => {
499 max_epoch = Some(epoch);
500 }
501 Some(e) if e < epoch => {
502 fs::remove_dir_all(log_dir.join(&format!("0i{e}"))).await?;
503 max_epoch = Some(epoch);
504 }
505 Some(e) if e != epoch => {
508 fs::remove_dir_all(log_dir.join(&format!("0i{epoch}"))).await?;
509 }
510 Some(_) => (),
511 }
512 }
513 }
514
515 Ok(self)
516 }
517
518 pub async fn post_unpack(
519 mut self,
520 vere_version: VereVersion,
521 db_path: &Path,
522 cache_path: &Path,
523 cfg: &PostUnpackCfg,
524 ) -> Result<StandardUnpack<T>> {
525 let db = VersionDb::load(db_path).await?;
527
528 let vere = db.bin_from_version(&vere_version, cache_path).await?;
529
530 let vere_path = self.join(".run");
531 fs::write(&vere_path, vere).await?;
532
533 let mut perms = fs::metadata(&vere_path).await?.permissions();
534 perms.set_mode(0o755);
535 fs::set_permissions(&vere_path, perms).await?;
536
537 if cfg.prep {
538 self = self.prep().await?;
539 }
540
541 if cfg.verify_cram {
542 self = self.cram().await?;
543 }
544
545 if cfg.verify_events {
546 self = self.store_events().await?;
547 }
548
549 if cfg.pack {
550 self = self.pack().await?;
551 }
552
553 if cfg.meld {
554 self = self.meld().await?;
555 }
556
557 if cfg.chop {
558 self = self.chop().await?;
559 }
560
561 if cfg.verify_cram {
562 self = self.verify_cram().await?;
563 }
564
565 if cfg.verify_events {
566 self = self.verify_events().await?;
567 }
568
569 Ok(self)
570 }
571
572 pub async fn lmdb_patp(self) -> Result<String> {
573 Err(anyhow!("Not yet implemented").into())
574 }
575}
576
577#[async_trait]
578impl<T: AsRef<Path> + Send> AsyncUnpack for StandardUnpack<T> {
579 async fn unpack<'a>(
580 &'a mut self,
581 path: &'a Path,
582 in_subpath: bool,
583 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
584 ) -> io::Result<()> {
585 let path = if in_subpath {
586 path.components()
587 .skip_while(|c| c == &Component::CurDir)
588 .skip(1)
589 .collect()
590 } else {
591 path.to_path_buf()
592 };
593
594 let dir = if let Some(parent) = path.parent() {
595 self.join(parent)
596 } else {
597 self.to_path_buf()
598 };
599
600 trace!("Write file {path:?} {in_subpath} {dir:?}");
601
602 fs::create_dir_all(dir).await?;
603
604 let out_path = self.join(&path);
605
606 let file = fs::File::create(&out_path).await?;
607
608 futures::io::copy(stream, &mut file.compat()).await?;
609
610 trace!("Written file");
611
612 Ok(())
613 }
614
615 async fn unpack_exec<'a>(
616 &'a mut self,
617 path: &'a Path,
618 in_subpath: bool,
619 stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
620 ) -> io::Result<()> {
621 self.unpack(path, in_subpath, stream).await?;
622
623 let path = if in_subpath {
624 path.components()
625 .skip_while(|c| c == &Component::CurDir)
626 .skip(1)
627 .collect()
628 } else {
629 path.to_path_buf()
630 };
631
632 let path = self.join(&path);
633
634 trace!("Metadata {path:?}");
635
636 let mut perms = fs::metadata(&path).await?.permissions();
637 perms.set_mode(0o755);
638
639 trace!("Set perms {path:?}");
640
641 fs::set_permissions(&path, perms).await?;
642
643 trace!("Unpacked");
644
645 Ok(())
646 }
647}
648
649const MAX_SIZE: u64 = 0x400000000;
651const MAX_VERE_SIZE: u64 = 0x8000000;
653
654#[derive(Debug)]
655pub enum Pace {
656 Live,
657 Once,
658}
659
660#[derive(Default, Debug)]
661pub struct UrbitPier {
662 pub pace: Option<Pace>,
663 pub vere_hash: Option<String>,
664 }
667
668impl UrbitPier {
669 pub async fn vere_version(
674 &self,
675 db_path: &Path,
676 undocked_fallback: Option<(&str, &str)>,
677 ) -> Result<VereVersion> {
678 let db = VersionDb::load(db_path).await.unwrap_or_default();
679 let (db, updated) = db.refresh_if_older(Duration::from_secs(3600)).await?;
681
682 if updated {
683 db.save(db_path).await?;
684 }
685
686 if let Some(vere_hash) = self.vere_hash.as_ref() {
687 db.versions
688 .get(vere_hash)
689 .map(|v| v.inner.clone())
690 .ok_or_else(|| anyhow!("Could not match vere hash with version").into())
691 } else if let Some((os, arch)) = undocked_fallback {
692 debug!("No vere hash found. Falling back to latest.");
693 db.versions
694 .values()
695 .fold(None, |prev: Option<&VereVersion>, cur| {
696 if cur.inner.os == os && cur.inner.arch == arch {
697 if let Some(prev) = prev {
698 if prev
699 .version
700 .split('.')
701 .map(|v| v.parse::<u32>().unwrap_or_default())
702 .cmp(
703 cur.inner
704 .version
705 .split('.')
706 .map(|v| v.parse::<u32>().unwrap_or_default()),
707 )
708 == core::cmp::Ordering::Less
709 {
710 return Some(&cur.inner);
711 }
712 } else {
713 return Some(&cur.inner);
714 }
715 }
716 prev
717 })
718 .cloned()
719 .ok_or_else(|| anyhow!("Could not find a version for given arch").into())
720 } else {
721 Err(anyhow!("No vere in pier, and no undocked fallback set").into())
722 }
723 }
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
727pub struct VereVersion {
728 pub version: String,
729 pub os: String,
730 pub arch: String,
731}
732
733#[derive(Debug, Clone, Serialize, Deserialize)]
734struct VereVersionOuter {
735 inner: VereVersion,
736 asset: AssetId,
737}
738
739#[derive(Debug, Default, Clone, Serialize, Deserialize)]
740pub struct VersionDb {
741 versions: BTreeMap<String, VereVersionOuter>,
744 latest_release: Option<ReleaseId>,
745 processed_assets: BTreeSet<AssetId>,
746 update_time: Option<SystemTime>,
747}
748
749impl VersionDb {
750 pub async fn load(path: &Path) -> Result<Self> {
751 let data = fs::read_to_string(path).await?;
752 serde_json::from_str(&data).map_err(Into::into)
753 }
754
755 async fn get_with_redirects(gh: &Octocrab, mut url: String) -> Result<Vec<u8>> {
756 let mut cnt = 0;
757
758 loop {
759 cnt += 1;
760
761 if cnt > 10 {
762 return Err(anyhow!("Too many redirects").into());
763 }
764
765 let response = gh._get(&url).await?;
766
767 if response.status() != hyper::StatusCode::OK {
768 let Some(location) = response.headers().get(hyper::header::LOCATION) else {
769 return Err(anyhow!("Error downloading vere: {}", response.status()).into());
770 };
771 url = location.to_str()?.to_string();
772 trace!("Redirect to {url}");
773 } else {
774 break Ok(hyper::body::to_bytes(response.into_body()).await?.to_vec());
775 }
776 }
777 }
778
779 async fn ensure_downloaded(&self, id: AssetId, cache_dir: &Path) -> Result<()> {
780 fs::create_dir_all(cache_dir).await?;
781
782 let asset_path = cache_dir.join(id.to_string());
783
784 if let Ok(bytes) = fs::read(&asset_path).await {
786 let digest = sha256::digest(&bytes);
787
788 if self.versions.get(&digest).map(|v| v.asset == id) == Some(true) {
789 debug!("Cached vere version found for {id}.");
790 return Ok(());
791 }
792
793 debug!("Hash mismatch for {id}. Redownloading.");
794 }
795
796 let gh = octocrab::instance();
797 let repos = gh.repos("urbit", "vere");
798 let releases = repos.releases();
799 let asset = releases.get_asset(id).await?;
800
801 let url = asset.browser_download_url.to_string();
802 debug!("Download vere from {url}");
803 let bytes = Self::get_with_redirects(&gh, url).await?;
804
805 let bytes = tokio::task::spawn_blocking(move || Self::unpack_vere(bytes))
806 .await??
807 .1;
808
809 fs::write(asset_path, bytes).await?;
810
811 Ok(())
812 }
813
814 async fn bin_from_version(&self, version: &VereVersion, cache_dir: &Path) -> Result<Vec<u8>> {
815 let asset = self
816 .versions
817 .values()
818 .find_map(|v| {
819 if &v.inner == version {
820 Some(v.asset)
821 } else {
822 None
823 }
824 })
825 .ok_or_else(|| anyhow!("Could not find compatible binary asset"))?;
826
827 self.ensure_downloaded(asset, cache_dir).await?;
828
829 Ok(fs::read(cache_dir.join(asset.to_string())).await?)
830 }
831
832 fn unpack_vere(bytes: impl AsRef<[u8]>) -> Result<(String, Vec<u8>)> {
833 use std::io::Read;
834
835 let gz = flate2::read::GzDecoder::new(bytes.as_ref());
836 let mut a = Archive::new(gz);
837
838 let mut f = a
840 .entries()?
841 .next()
842 .transpose()?
843 .ok_or_else(|| anyhow!("File empty"))?;
844
845 let mut bytes = vec![];
846 f.read_to_end(&mut bytes)?;
847
848 Ok((sha256::digest(&bytes), bytes))
850 }
851
852 async fn process_release(&mut self, gh: &Octocrab, release: Release) -> Result<()> {
853 self.latest_release = Some(release.id);
854
855 let Some(version) = release.tag_name.strip_prefix("vere-v") else {
856 warn!(
857 "Skipping {}, because it has invalid prefix.",
858 release.tag_name
859 );
860 return Ok(());
861 };
862
863 for asset in release.assets {
864 let Some(archive) = asset.name.strip_suffix(".tgz") else {
866 continue;
867 };
868 let Some((os, arch)) = archive
869 .split_once('-')
870 .map(|(a, b)| (a.to_string(), b.to_string()))
871 else {
872 warn!("Skipping {archive}, since it is not formatted as OS-Arch");
873 continue;
874 };
875
876 if self.processed_assets.contains(&asset.id) {
877 continue;
878 }
879
880 debug!("Downloading {version}: {archive}");
882
883 let bytes =
884 Self::get_with_redirects(gh, asset.browser_download_url.to_string()).await?;
885
886 let inner = VereVersion {
887 version: version.into(),
888 os,
889 arch,
890 };
891
892 let hash = tokio::task::spawn_blocking(move || Self::unpack_vere(bytes))
893 .await??
894 .0;
895
896 self.versions.insert(
897 hash.clone(),
898 VereVersionOuter {
899 inner,
900 asset: asset.id,
901 },
902 );
903
904 self.processed_assets.insert(asset.id);
905 }
906
907 Ok(())
908 }
909
910 pub async fn refresh_if_older(self, duration: Duration) -> Result<(Self, bool)> {
911 if self
912 .update_time
913 .and_then(|v| v.elapsed().ok())
914 .map(|v| v >= duration)
915 != Some(false)
916 {
917 self.refresh().await
918 } else {
919 Ok((self, false))
920 }
921 }
922
923 pub async fn refresh(mut self) -> Result<(Self, bool)> {
924 let gh = octocrab::instance();
925
926 let repos = gh.repos("urbit", "vere");
927 let releases = repos.releases();
928
929 let latest = releases.get_latest().await?;
931
932 self.update_time = Some(SystemTime::now());
933
934 if Some(latest.id) == self.latest_release {
935 info!("Only syncing latest release ({})", latest.tag_name);
936
937 self.process_release(&gh, latest).await?;
939 return Ok((self, false));
940 }
941
942 info!("Pulling new vere releases");
943
944 let mut out_rel = vec![];
948
949 for i in 0u32.. {
950 let Ok(list) = releases.list().per_page(5).page(i).send().await else {
952 break;
953 };
954
955 if list.items.is_empty() {
956 break;
957 }
958
959 let incomplete = list.incomplete_results == Some(true);
960
961 let mut hit_latest = false;
962
963 out_rel.extend(
964 list.into_iter()
965 .filter(|v| !v.draft && !v.prerelease)
966 .inspect(|v| hit_latest = hit_latest || Some(v.id) == self.latest_release),
967 );
968
969 if hit_latest {
972 break;
973 }
974
975 if incomplete {
976 return Err(anyhow!(
977 "Got incomplete results, we do not support that at the moment"
978 )
979 .into());
980 }
981 }
982
983 out_rel.sort_by_key(|v| v.created_at);
984
985 for release in out_rel {
986 debug!("Process {}", release.tag_name);
987 self.process_release(&gh, release).await?;
988 }
989
990 Ok((self, true))
991 }
992
993 pub async fn save(&self, path: &Path) -> Result<()> {
994 let data = serde_json::to_string_pretty(self)?;
995 fs::write(path, data.as_bytes()).await?;
996 Ok(())
997 }
998}
999
1000fn is_subpath(in_path: &Path, target_path: &Path) -> Option<(bool, bool)> {
1002 let mut in_components = in_path.components().skip_while(|c| c == &Component::CurDir);
1003
1004 let target_components = target_path
1005 .components()
1006 .skip_while(|c| c == &Component::CurDir);
1007
1008 let mut in_subpath = false;
1009
1010 for target in target_components {
1011 loop {
1013 let Some(inp) = in_components.next() else {
1014 return None;
1015 };
1016
1017 if target != inp {
1018 if in_subpath {
1019 return None;
1020 } else {
1021 in_subpath = true;
1022 continue;
1023 }
1024 }
1025
1026 break;
1029 }
1030 }
1031
1032 Some((in_components.next().is_some(), in_subpath))
1033}
1034
1035fn is_path(in_path: &Path, target_path: &Path) -> (bool, bool) {
1036 is_subpath(in_path, target_path)
1037 .map(|(a, b)| if !a { (true, b) } else { (false, false) })
1038 .unwrap_or_default()
1039}
1040
1041fn is_vere(in_path: &Path) -> (bool, bool) {
1042 is_path(in_path, Path::new(".run"))
1043}
1044
1045fn is_allowed_file(in_path: &Path, prt_extensions: &[impl AsRef<str>]) -> (bool, bool) {
1046 for f in [".bin/pace", ".run", ".prt/info.json"] {
1047 if let Some((false, in_subpath)) = is_subpath(in_path, Path::new(f)) {
1048 return (true, in_subpath);
1049 }
1050 }
1051
1052 for d in [".urb/get", ".urb/put", ".urb/roc"] {
1054 if is_subpath(in_path, Path::new(d)).is_some() {
1055 return (false, false);
1056 }
1057 }
1058
1059 for d in [".urb"] {
1060 if let Some((true, in_subpath)) = is_subpath(in_path, Path::new(d)) {
1061 return (true, in_subpath);
1062 }
1063 }
1064
1065 for ext in prt_extensions {
1066 if let Some((true, in_subpath)) = is_subpath(in_path, &Path::new(".prt").join(ext.as_ref()))
1067 {
1068 return (true, in_subpath);
1069 }
1070 }
1071
1072 (false, false)
1073}
1074
1075pub async fn import_zstd_stream<I: AsyncRead + Send>(
1077 stream_in: I,
1078 file_out: &mut (impl AsyncUnpack + ?Sized),
1079 prt_extensions: &[impl AsRef<str>],
1080) -> Result<UrbitPier> {
1081 import_zstd_stream_with_vere(stream_in, file_out, prt_extensions, false).await
1082}
1083
1084pub async fn import_zstd_stream_with_vere<I: AsyncRead + Send>(
1086 stream_in: I,
1087 file_out: &mut (impl AsyncUnpack + ?Sized),
1088 prt_extensions: &[impl AsRef<str>],
1089 unpack_vere: bool,
1090) -> Result<UrbitPier> {
1091 let stream_in = BufReader::new(stream_in);
1092 let mut stream_in = ZstdDecoder::new(stream_in);
1093 let stream_in = pin!(stream_in);
1094 let ar = async_tar::Archive::new(stream_in);
1095 let mut entries = ar.entries()?;
1096
1097 let mut subpath_mode = None;
1099
1100 let mut pier = UrbitPier::default();
1101
1102 while let Some(entry) = entries.next().await {
1103 let mut entry = entry?;
1104
1105 trace!("ZSTD {entry:?}");
1106
1107 if !matches!(
1108 entry.header().entry_type(),
1109 EntryType::Regular
1110 | EntryType::Continuous
1111 | EntryType::GNULongName
1112 | EntryType::GNUSparse
1113 ) {
1114 continue;
1115 }
1116
1117 let path: PathBuf = (*entry.path()?).into();
1118 let size = entry.header().size()?;
1119
1120 let (is_vere, in_subpath) = is_vere(&path);
1121
1122 trace!("ZSTD: {path:?} is_vere={is_vere} in_subpath={in_subpath}");
1123
1124 if is_vere {
1125 if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1126 warn!(
1127 "Subpath mode does not match ({in_subpath} vs. {})",
1128 subpath_mode.unwrap()
1129 );
1130 continue;
1131 } else {
1132 subpath_mode = Some(in_subpath);
1133
1134 if size > MAX_VERE_SIZE {
1135 warn!("Vere too large ({size} bytes)",);
1136 continue;
1137 }
1138
1139 let mut vere = vec![];
1142 entry.read_to_end(&mut vere).await?;
1143 pier.vere_hash = Some(sha256::digest(&vere));
1144
1145 if unpack_vere {
1146 file_out
1147 .unpack_exec(&path, in_subpath, pin!(Cursor::new(vere)))
1148 .await?;
1149 }
1150
1151 continue;
1152 }
1153 }
1154
1155 let (is_allowed_file, in_subpath) = is_allowed_file(&path, prt_extensions);
1156
1157 if is_allowed_file {
1158 if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1159 warn!(
1160 "Subpath mode does not match ({in_subpath} vs. {})",
1161 subpath_mode.unwrap()
1162 );
1163 continue;
1164 } else {
1165 subpath_mode = Some(in_subpath);
1166
1167 if size > MAX_SIZE {
1168 warn!("File too large ({size} bytes)",);
1169 continue;
1170 }
1171
1172 file_out.unpack(&path, in_subpath, pin!(entry)).await?;
1173 }
1174 }
1175 }
1176
1177 if subpath_mode.is_some() {
1178 Ok(pier)
1179 } else {
1180 Err(anyhow!("Pier is empty").into())
1181 }
1182}
1183
1184pub async fn import_zip_file<I: AsyncRead + AsyncSeek + Send>(
1188 stream_in: I,
1189 file_out: &mut (impl AsyncUnpack + ?Sized),
1190 prt_extensions: &[impl AsRef<str>],
1191) -> Result<UrbitPier> {
1192 let mut stream_in = BufReader::new(stream_in);
1193 let stream_in = pin!(stream_in);
1194 let mut zip = read::seek::ZipFileReader::new(stream_in).await?;
1195
1196 let mut subpath_mode = None;
1198
1199 let mut pier = UrbitPier::default();
1200
1201 for i in 0.. {
1202 let Ok(mut file) = zip.reader_with_entry(i).await else {
1203 break;
1204 };
1205
1206 if matches!(file.entry().dir(), Ok(true)) {
1208 continue;
1209 }
1210
1211 let entry = file.entry();
1212
1213 let Ok(path) = entry.filename().as_str().map(PathBuf::from) else {
1214 continue;
1215 };
1216
1217 let (is_vere, in_subpath) = is_vere(&path);
1218
1219 if is_vere {
1220 if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1221 warn!(
1222 "Subpath mode does not match ({in_subpath} vs. {})",
1223 subpath_mode.unwrap()
1224 );
1225 continue;
1226 } else {
1227 subpath_mode = Some(in_subpath);
1228
1229 if file.entry().uncompressed_size() > MAX_VERE_SIZE {
1230 warn!(
1231 "Vere too large ({} bytes)",
1232 file.entry().uncompressed_size()
1233 );
1234 continue;
1235 }
1236
1237 let mut vere = vec![];
1240 file.read_to_end_checked(&mut vere).await?;
1241 pier.vere_hash = Some(sha256::digest(&vere));
1242 continue;
1243 }
1244 }
1245
1246 let (is_allowed_file, in_subpath) = is_allowed_file(&path, prt_extensions);
1247
1248 if is_allowed_file {
1249 if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1250 warn!(
1251 "Subpath mode does not match ({in_subpath} vs. {})",
1252 subpath_mode.unwrap()
1253 );
1254 continue;
1255 } else {
1256 subpath_mode = Some(in_subpath);
1257
1258 if file.entry().uncompressed_size() > MAX_SIZE {
1259 warn!(
1260 "File too large ({} bytes)",
1261 file.entry().uncompressed_size()
1262 );
1263 continue;
1264 }
1265
1266 file_out.unpack(&path, in_subpath, pin!(file)).await?;
1267 }
1268 }
1269 }
1270
1271 Ok(pier)
1272}
1273
1274pub async fn export_dir(path: &Path, stream_out: (impl AsyncWrite + Send + Sync)) -> Result<()> {
1276 let stream_out = BufWriter::new(stream_out);
1277 let stream_out = ZstdEncoder::new(stream_out);
1278 let stream_out = pin!(stream_out);
1279
1280 let mut ar = async_tar::Builder::new(stream_out);
1281 ar.append_dir_all(".", path).await?;
1282 ar.finish().await?;
1283 let mut zstd = ar.into_inner().await?;
1284 zstd.close().await?;
1285 zstd.get_pin_mut().flush().await?;
1286
1287 Ok(())
1288}
1289
1290pub fn current_os_arch() -> (&'static str, &'static str) {
1291 use std::env::consts::{ARCH, OS};
1292 (OS, ARCH)
1293}