pierport/
lib.rs

1//! # Pierport
2//!
3//! # Reference building blocks for the Pierport Protocol
4//!
5//! This library contains building blocks for a rust-based implementation of the pierport protocol.
6
7use anyhow::anyhow;
8use async_compression::futures::{bufread::ZstdDecoder, write::ZstdEncoder};
9use async_tar::EntryType;
10use async_trait::async_trait;
11use async_zip::base::read;
12use core::pin::{pin, Pin};
13use futures::{
14    io::{
15        AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, AsyncWriteExt, BufReader, BufWriter, Cursor,
16    },
17    StreamExt,
18};
19use log::*;
20use octocrab::{
21    models::{repos::Release, AssetId, ReleaseId},
22    Octocrab,
23};
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeMap, BTreeSet};
26use std::io;
27use std::os::unix::fs::PermissionsExt;
28use std::path::{Component, Path, PathBuf};
29use std::time::{Duration, SystemTime};
30use tar::Archive;
31use tokio::fs;
32use tokio::process::Command;
33use tokio_util::compat::TokioAsyncReadCompatExt;
34
35pub mod header;
36
37/// Supported import formats, as per pierport spec.
38#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum ImportFormat {
41    Zip,
42    Zstd,
43}
44
45/// Import payload, as per pierport spec.
46///
47/// This is an alternative way to performing import requests - push a JSON payload that instructs
48/// the import destination to pull the pier from alternative source. This has the potential to
49/// minimize the number of intermediate copies needed.
50#[derive(Clone, Debug, Serialize, Deserialize)]
51pub struct ImportPayload {
52    pub url: String,
53    pub authorization: Option<String>,
54    pub format: ImportFormat,
55}
56
57/// Status of the import session, as per pierport spec.
58#[derive(Clone, Debug, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum ImportStatus {
61    /// Successfully imported
62    Done,
63    /// Failed to import
64    Failed { reason: Option<String> },
65    /// The pier is importing, status may contain extra information.
66    Importing { status: Option<String> },
67}
68
69/// Describes the capabilities exposed by the given pierport implementation.
70#[derive(Clone, Debug, Default, Serialize, Deserialize)]
71pub struct ImportCapabilities {
72    pub info: Vec<String>,
73    pub extensions: Vec<String>,
74}
75
76/// Crate-wide error type.
77///
78/// FIXME: this is a bit of a hack to simply wrap anyhow error, and we should split this into more
79/// specific error enum, but this will do for now.
80pub struct Error(anyhow::Error);
81
82impl core::fmt::Debug for Error {
83    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
84        self.0.fmt(f)
85    }
86}
87
88impl core::fmt::Display for Error {
89    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
90        self.0.fmt(f)
91    }
92}
93
94impl<T: Into<anyhow::Error>> From<T> for Error {
95    fn from(e: T) -> Self {
96        Self(e.into())
97    }
98}
99
100impl axum::response::IntoResponse for Error {
101    fn into_response(self) -> axum::response::Response {
102        (
103            axum::http::StatusCode::INTERNAL_SERVER_ERROR,
104            format!("{:#?}", self.0.to_string()),
105        )
106            .into_response()
107    }
108}
109
110pub type Result<T> = core::result::Result<T, Error>;
111
112#[async_trait]
113pub trait AsyncUnpack {
114    /// Unpack a file at given path.
115    async fn unpack<'a>(
116        &'a mut self,
117        path: &'a Path,
118        in_subpath: bool,
119        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
120    ) -> io::Result<()>;
121
122    /// Unpack an executable at given path.
123    ///
124    /// The only difference from `unpack` is that this expects the resulting file to be marked as
125    /// executable.
126    async fn unpack_exec<'a>(
127        &'a mut self,
128        path: &'a Path,
129        in_subpath: bool,
130        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
131    ) -> io::Result<()>;
132}
133
134#[async_trait]
135impl<
136        T: for<'a> Fn(&'a Path, bool, Pin<&'a mut (dyn AsyncRead + Send + 'a)>, bool) -> F + Send,
137        F: core::future::Future<Output = io::Result<()>> + Send,
138    > AsyncUnpack for T
139{
140    async fn unpack<'a>(
141        &'a mut self,
142        path: &'a Path,
143        in_subpath: bool,
144        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
145    ) -> io::Result<()> {
146        let f = (*self)(path, in_subpath, stream, false);
147        f.await
148    }
149
150    async fn unpack_exec<'a>(
151        &'a mut self,
152        path: &'a Path,
153        in_subpath: bool,
154        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
155    ) -> io::Result<()> {
156        let f = (*self)(path, in_subpath, stream, true);
157        f.await
158    }
159}
160
161pub async fn find_files_by_suffix(
162    directory: impl AsRef<Path>,
163    suffix: &str,
164) -> io::Result<Vec<fs::DirEntry>> {
165    let mut entries = vec![];
166    if let Ok(mut read_dir) = fs::read_dir(directory).await {
167        while let Ok(Some(entry)) = read_dir.next_entry().await {
168            let filetype = entry.file_type().await?;
169            if filetype.is_dir() {
170                continue;
171            }
172            if let Some(true) = entry.path().to_str().map(|s| s.ends_with(suffix)) {
173                entries.push(entry)
174            }
175        }
176    }
177    Ok(entries)
178}
179
180/// Actions to be taken after pier is unpacked.
181#[derive(Clone, Debug, Serialize, Deserialize)]
182#[serde(default)]
183pub struct PostUnpackCfg {
184    /// Catch up the event log.
185    prep: bool,
186    /// Cram and verify that the cram hasn't changed after subsequent steps.
187    verify_cram: bool,
188    /// Store snapshot and event log event count, and verify it.
189    verify_events: bool,
190    /// Run vere pack.
191    pack: bool,
192    /// Run vere meld.
193    meld: bool,
194    /// Run vere chop.
195    chop: bool,
196}
197
198impl Default for PostUnpackCfg {
199    fn default() -> Self {
200        Self {
201            prep: true,
202            verify_cram: false,
203            verify_events: true,
204            pack: true,
205            meld: true,
206            chop: true,
207        }
208    }
209}
210
211impl PostUnpackCfg {
212    pub fn verify_cram(self, verify_cram: bool) -> Self {
213        Self {
214            verify_cram,
215            ..self
216        }
217    }
218
219    pub fn verify_events(self, verify_events: bool) -> Self {
220        Self {
221            verify_events,
222            ..self
223        }
224    }
225
226    pub fn prep(self, prep: bool) -> Self {
227        Self { prep, ..self }
228    }
229
230    pub fn pack(self, pack: bool) -> Self {
231        Self { pack, ..self }
232    }
233
234    pub fn meld(self, meld: bool) -> Self {
235        Self { meld, ..self }
236    }
237
238    pub fn chop(self, chop: bool) -> Self {
239        Self { chop, ..self }
240    }
241
242    pub fn all() -> Self {
243        Self {
244            verify_cram: true,
245            verify_events: true,
246            prep: true,
247            pack: true,
248            meld: true,
249            chop: true,
250        }
251    }
252}
253
254#[derive(Debug, PartialEq, Eq, Clone, Copy)]
255struct EventCount {
256    snapshot: u64,
257    disk: u64,
258}
259
260impl EventCount {
261    fn from_info_stderr(info: &str) -> anyhow::Result<Self> {
262        let mut lines = info.lines();
263
264        while let Some(line) = lines.next() {
265            let line = line.trim();
266
267            // Parse snapshot event number
268            let Some(line) = line.strip_prefix("urbit: ") else {
269                continue;
270            };
271            let Some((_, line)) = line.split_once("at event ") else {
272                continue;
273            };
274            let Ok(snapshot) = line.parse::<u64>() else {
275                continue;
276            };
277
278            trace!("Parsed snapshot event number: {snapshot}");
279
280            let Some(line) = lines.next() else { continue };
281            let line = line.trim();
282
283            // Parse the disk (event log) event number
284            if !line.contains("disk:") {
285                continue;
286            }
287            let Some((_, line)) = line.split_once("event=") else {
288                continue;
289            };
290            let Ok(disk) = line.parse::<u64>() else {
291                continue;
292            };
293
294            return Ok(Self { snapshot, disk });
295        }
296
297        Err(anyhow!("Could not parse info output"))
298    }
299}
300
301#[derive(Clone)]
302pub struct StandardUnpack<T: AsRef<Path>> {
303    path: T,
304    loom: Option<usize>,
305    event_count: Option<EventCount>,
306}
307
308impl<T: AsRef<Path>> StandardUnpack<T> {
309    pub fn loom(&self) -> Option<usize> {
310        self.loom
311    }
312}
313
314impl<T: AsRef<Path>> Drop for StandardUnpack<T> {
315    fn drop(&mut self) {
316        let path = self.path.as_ref();
317        trace!("Drop {path:?}");
318        if path.exists() {
319            if let Err(e) = std::fs::remove_dir_all(path) {
320                error!("StandardUnpack: unable to remove dir ({e:?})");
321            }
322        }
323    }
324}
325
326impl<T: AsRef<Path>> core::ops::Deref for StandardUnpack<T> {
327    type Target = Path;
328
329    fn deref(&self) -> &Self::Target {
330        self.path.as_ref()
331    }
332}
333
334impl<T: AsRef<Path>> StandardUnpack<T> {
335    pub async fn new(path: T, loom: Option<usize>) -> Result<StandardUnpack<T>> {
336        if path.as_ref().exists() {
337            info!("Remove {:?}", path.as_ref());
338            fs::remove_dir_all(path.as_ref()).await?;
339        }
340
341        fs::create_dir_all(path.as_ref()).await?;
342
343        Ok(Self {
344            path,
345            loom,
346            event_count: None,
347        })
348    }
349
350    pub async fn detect_loom(&mut self) -> Result<Option<usize>> {
351        // TODO: autodetect loom based on the snapshot size
352        Ok(None)
353    }
354
355    pub fn set_loom(&mut self, loom: Option<usize>) {
356        self.loom = loom;
357    }
358
359    async fn run_cmd(&mut self, args: &[&str]) -> Result<String> {
360        let mut cmd = Command::new("./.run");
361
362        cmd.current_dir(&**self).args(args);
363
364        if let Some(loom) = self.loom {
365            cmd.args(["--loom", &loom.to_string(), "-t"]);
366        }
367
368        let output = cmd.output().await?;
369
370        trace!("{:?}", std::str::from_utf8(&output.stdout));
371
372        if !output.status.success() {
373            Err(anyhow!(
374                "Command failed: {} {:?}",
375                output.status,
376                std::str::from_utf8(&output.stderr)
377            )
378            .into())
379        } else {
380            Ok(String::from_utf8_lossy(&output.stderr).into())
381        }
382    }
383
384    pub async fn store_events(mut self) -> Result<StandardUnpack<T>> {
385        debug!("Pre-work event count");
386        let err = self.run_cmd(&["-R"]).await?;
387        self.event_count = Some(EventCount::from_info_stderr(&err)?);
388        Ok(self)
389    }
390
391    pub async fn verify_events(mut self) -> Result<StandardUnpack<T>> {
392        debug!("Pre-work event count");
393        let err = self.run_cmd(&["-R"]).await?;
394        let event_count = EventCount::from_info_stderr(&err)?;
395        let Some(events) = self.event_count else {
396            return Err(anyhow!(
397                "verify_events called without previous store_events"
398            ).into());
399        };
400        if event_count == events {
401            Ok(self)
402        } else {
403            Err(anyhow!(
404                "Event count mismatch between prev={events:?} and cur={event_count:?}"
405            ).into())
406        }
407    }
408
409    pub async fn cram(mut self) -> Result<StandardUnpack<T>> {
410        debug!("Pre-work cram");
411        self.run_cmd(&["cram"]).await?;
412        Ok(self)
413    }
414
415    pub async fn verify_cram(mut self) -> Result<StandardUnpack<T>> {
416        debug!("Post-work");
417
418        // Get the path to current jam
419        let roc_dir = self.join(".urb/roc");
420        let entries = find_files_by_suffix(&roc_dir, ".jam").await?;
421
422        let [entry] = &entries[..] else {
423            return Err(anyhow!("Invalid number of jams").into());
424        };
425
426        let hash = sha256::async_digest::try_async_digest(entry.path()).await?;
427        fs::remove_file(entry.path()).await?;
428
429        debug!("Pre-work hash: {hash}");
430
431        // Get the current hash of the current jam
432
433        self.run_cmd(&["cram"]).await?;
434
435        let hash2 = sha256::async_digest::try_async_digest(entry.path()).await?;
436        fs::remove_dir_all(roc_dir).await?;
437
438        debug!("Post-work hash: {hash}");
439
440        if hash == hash2 {
441            Ok(self)
442        } else {
443            Err(anyhow!("Pre and post work jam mismatch").into())
444        }
445    }
446
447    pub async fn prep(mut self) -> Result<StandardUnpack<T>> {
448        debug!("Prep");
449        self.run_cmd(&["prep"]).await?;
450        Ok(self)
451    }
452
453    pub async fn pack(mut self) -> Result<StandardUnpack<T>> {
454        debug!("Pack");
455        self.run_cmd(&["pack"]).await?;
456        Ok(self)
457    }
458
459    pub async fn meld(mut self) -> Result<StandardUnpack<T>> {
460        debug!("Meld");
461        self.run_cmd(&["meld"]).await?;
462        Ok(self)
463    }
464
465    pub async fn chop(mut self) -> Result<StandardUnpack<T>> {
466        debug!("Chop");
467        self.run_cmd(&["chop"]).await?;
468
469        // Cleans up all pre-3.0 chops
470        let chop_dir = self.join(".urb/log/chop");
471        if chop_dir.exists() {
472            fs::remove_dir_all(chop_dir).await?;
473        }
474
475        // Cleans up all 3.0+ chops
476        // Remove all epochs, but the latest one
477        let log_dir = self.join(".urb/log");
478        let mut max_epoch = None;
479        if let Ok(mut read_dir) = fs::read_dir(&log_dir).await {
480            while let Ok(Some(entry)) = read_dir.next_entry().await {
481                let filetype = entry.file_type().await?;
482                if !filetype.is_dir() {
483                    continue;
484                }
485                let fname = entry.file_name();
486                let Some(fname) = fname.to_str() else {
487                    continue;
488                };
489
490                let Some(epoch) = fname
491                    .strip_prefix("0i")
492                    .and_then(|v| v.parse::<usize>().ok())
493                else {
494                    continue;
495                };
496
497                match max_epoch {
498                    None => {
499                        max_epoch = Some(epoch);
500                    }
501                    Some(e) if e < epoch => {
502                        fs::remove_dir_all(log_dir.join(&format!("0i{e}"))).await?;
503                        max_epoch = Some(epoch);
504                    }
505                    // This branch should always be hit, but don't make it unconditional, just to
506                    // be sure that we are not deleting the latest epoch.
507                    Some(e) if e != epoch => {
508                        fs::remove_dir_all(log_dir.join(&format!("0i{epoch}"))).await?;
509                    }
510                    Some(_) => (),
511                }
512            }
513        }
514
515        Ok(self)
516    }
517
518    pub async fn post_unpack(
519        mut self,
520        vere_version: VereVersion,
521        db_path: &Path,
522        cache_path: &Path,
523        cfg: &PostUnpackCfg,
524    ) -> Result<StandardUnpack<T>> {
525        // We need to correct the vere architecture to something
526        let db = VersionDb::load(db_path).await?;
527
528        let vere = db.bin_from_version(&vere_version, cache_path).await?;
529
530        let vere_path = self.join(".run");
531        fs::write(&vere_path, vere).await?;
532
533        let mut perms = fs::metadata(&vere_path).await?.permissions();
534        perms.set_mode(0o755);
535        fs::set_permissions(&vere_path, perms).await?;
536
537        if cfg.prep {
538            self = self.prep().await?;
539        }
540
541        if cfg.verify_cram {
542            self = self.cram().await?;
543        }
544
545        if cfg.verify_events {
546            self = self.store_events().await?;
547        }
548
549        if cfg.pack {
550            self = self.pack().await?;
551        }
552
553        if cfg.meld {
554            self = self.meld().await?;
555        }
556
557        if cfg.chop {
558            self = self.chop().await?;
559        }
560
561        if cfg.verify_cram {
562            self = self.verify_cram().await?;
563        }
564
565        if cfg.verify_events {
566            self = self.verify_events().await?;
567        }
568
569        Ok(self)
570    }
571
572    pub async fn lmdb_patp(self) -> Result<String> {
573        Err(anyhow!("Not yet implemented").into())
574    }
575}
576
577#[async_trait]
578impl<T: AsRef<Path> + Send> AsyncUnpack for StandardUnpack<T> {
579    async fn unpack<'a>(
580        &'a mut self,
581        path: &'a Path,
582        in_subpath: bool,
583        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
584    ) -> io::Result<()> {
585        let path = if in_subpath {
586            path.components()
587                .skip_while(|c| c == &Component::CurDir)
588                .skip(1)
589                .collect()
590        } else {
591            path.to_path_buf()
592        };
593
594        let dir = if let Some(parent) = path.parent() {
595            self.join(parent)
596        } else {
597            self.to_path_buf()
598        };
599
600        trace!("Write file {path:?} {in_subpath} {dir:?}");
601
602        fs::create_dir_all(dir).await?;
603
604        let out_path = self.join(&path);
605
606        let file = fs::File::create(&out_path).await?;
607
608        futures::io::copy(stream, &mut file.compat()).await?;
609
610        trace!("Written file");
611
612        Ok(())
613    }
614
615    async fn unpack_exec<'a>(
616        &'a mut self,
617        path: &'a Path,
618        in_subpath: bool,
619        stream: Pin<&'a mut (dyn AsyncRead + Send + 'a)>,
620    ) -> io::Result<()> {
621        self.unpack(path, in_subpath, stream).await?;
622
623        let path = if in_subpath {
624            path.components()
625                .skip_while(|c| c == &Component::CurDir)
626                .skip(1)
627                .collect()
628        } else {
629            path.to_path_buf()
630        };
631
632        let path = self.join(&path);
633
634        trace!("Metadata {path:?}");
635
636        let mut perms = fs::metadata(&path).await?.permissions();
637        perms.set_mode(0o755);
638
639        trace!("Set perms {path:?}");
640
641        fs::set_permissions(&path, perms).await?;
642
643        trace!("Unpacked");
644
645        Ok(())
646    }
647}
648
649// 16GB
650const MAX_SIZE: u64 = 0x400000000;
651// 128MB
652const MAX_VERE_SIZE: u64 = 0x8000000;
653
654#[derive(Debug)]
655pub enum Pace {
656    Live,
657    Once,
658}
659
660#[derive(Default, Debug)]
661pub struct UrbitPier {
662    pub pace: Option<Pace>,
663    pub vere_hash: Option<String>,
664    // Vere version hash matched against database of whitelisted runtimes
665    //pub vere_version: Option<String>,
666}
667
668impl UrbitPier {
669    // Attempts to match the pier's vere hash with the officially released vere.
670    //
671    // If the pier has no vere binary, and undocked_fallback specifies target OS and arch, then
672    // this will take the latest vere release that is for the given os-arch pair.
673    pub async fn vere_version(
674        &self,
675        db_path: &Path,
676        undocked_fallback: Option<(&str, &str)>,
677    ) -> Result<VereVersion> {
678        let db = VersionDb::load(db_path).await.unwrap_or_default();
679        // Refresh if database is older than 1 hour
680        let (db, updated) = db.refresh_if_older(Duration::from_secs(3600)).await?;
681
682        if updated {
683            db.save(db_path).await?;
684        }
685
686        if let Some(vere_hash) = self.vere_hash.as_ref() {
687            db.versions
688                .get(vere_hash)
689                .map(|v| v.inner.clone())
690                .ok_or_else(|| anyhow!("Could not match vere hash with version").into())
691        } else if let Some((os, arch)) = undocked_fallback {
692            debug!("No vere hash found. Falling back to latest.");
693            db.versions
694                .values()
695                .fold(None, |prev: Option<&VereVersion>, cur| {
696                    if cur.inner.os == os && cur.inner.arch == arch {
697                        if let Some(prev) = prev {
698                            if prev
699                                .version
700                                .split('.')
701                                .map(|v| v.parse::<u32>().unwrap_or_default())
702                                .cmp(
703                                    cur.inner
704                                        .version
705                                        .split('.')
706                                        .map(|v| v.parse::<u32>().unwrap_or_default()),
707                                )
708                                == core::cmp::Ordering::Less
709                            {
710                                return Some(&cur.inner);
711                            }
712                        } else {
713                            return Some(&cur.inner);
714                        }
715                    }
716                    prev
717                })
718                .cloned()
719                .ok_or_else(|| anyhow!("Could not find a version for given arch").into())
720        } else {
721            Err(anyhow!("No vere in pier, and no undocked fallback set").into())
722        }
723    }
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
727pub struct VereVersion {
728    pub version: String,
729    pub os: String,
730    pub arch: String,
731}
732
733#[derive(Debug, Clone, Serialize, Deserialize)]
734struct VereVersionOuter {
735    inner: VereVersion,
736    asset: AssetId,
737}
738
739#[derive(Debug, Default, Clone, Serialize, Deserialize)]
740pub struct VersionDb {
741    // HashMap would be more efficient, but the map is not going to be huge anyways, and sorting
742    // everything would be better.
743    versions: BTreeMap<String, VereVersionOuter>,
744    latest_release: Option<ReleaseId>,
745    processed_assets: BTreeSet<AssetId>,
746    update_time: Option<SystemTime>,
747}
748
749impl VersionDb {
750    pub async fn load(path: &Path) -> Result<Self> {
751        let data = fs::read_to_string(path).await?;
752        serde_json::from_str(&data).map_err(Into::into)
753    }
754
755    async fn get_with_redirects(gh: &Octocrab, mut url: String) -> Result<Vec<u8>> {
756        let mut cnt = 0;
757
758        loop {
759            cnt += 1;
760
761            if cnt > 10 {
762                return Err(anyhow!("Too many redirects").into());
763            }
764
765            let response = gh._get(&url).await?;
766
767            if response.status() != hyper::StatusCode::OK {
768                let Some(location) = response.headers().get(hyper::header::LOCATION) else {
769                    return Err(anyhow!("Error downloading vere: {}", response.status()).into());
770                };
771                url = location.to_str()?.to_string();
772                trace!("Redirect to {url}");
773            } else {
774                break Ok(hyper::body::to_bytes(response.into_body()).await?.to_vec());
775            }
776        }
777    }
778
779    async fn ensure_downloaded(&self, id: AssetId, cache_dir: &Path) -> Result<()> {
780        fs::create_dir_all(cache_dir).await?;
781
782        let asset_path = cache_dir.join(id.to_string());
783
784        // Check if there's existing asset with correct hash
785        if let Ok(bytes) = fs::read(&asset_path).await {
786            let digest = sha256::digest(&bytes);
787
788            if self.versions.get(&digest).map(|v| v.asset == id) == Some(true) {
789                debug!("Cached vere version found for {id}.");
790                return Ok(());
791            }
792
793            debug!("Hash mismatch for {id}. Redownloading.");
794        }
795
796        let gh = octocrab::instance();
797        let repos = gh.repos("urbit", "vere");
798        let releases = repos.releases();
799        let asset = releases.get_asset(id).await?;
800
801        let url = asset.browser_download_url.to_string();
802        debug!("Download vere from {url}");
803        let bytes = Self::get_with_redirects(&gh, url).await?;
804
805        let bytes = tokio::task::spawn_blocking(move || Self::unpack_vere(bytes))
806            .await??
807            .1;
808
809        fs::write(asset_path, bytes).await?;
810
811        Ok(())
812    }
813
814    async fn bin_from_version(&self, version: &VereVersion, cache_dir: &Path) -> Result<Vec<u8>> {
815        let asset = self
816            .versions
817            .values()
818            .find_map(|v| {
819                if &v.inner == version {
820                    Some(v.asset)
821                } else {
822                    None
823                }
824            })
825            .ok_or_else(|| anyhow!("Could not find compatible binary asset"))?;
826
827        self.ensure_downloaded(asset, cache_dir).await?;
828
829        Ok(fs::read(cache_dir.join(asset.to_string())).await?)
830    }
831
832    fn unpack_vere(bytes: impl AsRef<[u8]>) -> Result<(String, Vec<u8>)> {
833        use std::io::Read;
834
835        let gz = flate2::read::GzDecoder::new(bytes.as_ref());
836        let mut a = Archive::new(gz);
837
838        // We expect only a single file here
839        let mut f = a
840            .entries()?
841            .next()
842            .transpose()?
843            .ok_or_else(|| anyhow!("File empty"))?;
844
845        let mut bytes = vec![];
846        f.read_to_end(&mut bytes)?;
847
848        // Compute hash of the executable
849        Ok((sha256::digest(&bytes), bytes))
850    }
851
852    async fn process_release(&mut self, gh: &Octocrab, release: Release) -> Result<()> {
853        self.latest_release = Some(release.id);
854
855        let Some(version) = release.tag_name.strip_prefix("vere-v") else {
856            warn!(
857                "Skipping {}, because it has invalid prefix.",
858                release.tag_name
859            );
860            return Ok(());
861        };
862
863        for asset in release.assets {
864            // Only process tgz archives
865            let Some(archive) = asset.name.strip_suffix(".tgz") else {
866                continue;
867            };
868            let Some((os, arch)) = archive
869                .split_once('-')
870                .map(|(a, b)| (a.to_string(), b.to_string()))
871            else {
872                warn!("Skipping {archive}, since it is not formatted as OS-Arch");
873                continue;
874            };
875
876            if self.processed_assets.contains(&asset.id) {
877                continue;
878            }
879
880            // Now, pull the binary
881            debug!("Downloading {version}: {archive}");
882
883            let bytes =
884                Self::get_with_redirects(gh, asset.browser_download_url.to_string()).await?;
885
886            let inner = VereVersion {
887                version: version.into(),
888                os,
889                arch,
890            };
891
892            let hash = tokio::task::spawn_blocking(move || Self::unpack_vere(bytes))
893                .await??
894                .0;
895
896            self.versions.insert(
897                hash.clone(),
898                VereVersionOuter {
899                    inner,
900                    asset: asset.id,
901                },
902            );
903
904            self.processed_assets.insert(asset.id);
905        }
906
907        Ok(())
908    }
909
910    pub async fn refresh_if_older(self, duration: Duration) -> Result<(Self, bool)> {
911        if self
912            .update_time
913            .and_then(|v| v.elapsed().ok())
914            .map(|v| v >= duration)
915            != Some(false)
916        {
917            self.refresh().await
918        } else {
919            Ok((self, false))
920        }
921    }
922
923    pub async fn refresh(mut self) -> Result<(Self, bool)> {
924        let gh = octocrab::instance();
925
926        let repos = gh.repos("urbit", "vere");
927        let releases = repos.releases();
928
929        // Fetch latest urbit release
930        let latest = releases.get_latest().await?;
931
932        self.update_time = Some(SystemTime::now());
933
934        if Some(latest.id) == self.latest_release {
935            info!("Only syncing latest release ({})", latest.tag_name);
936
937            // process assets of the latest release to make sure any asset changes are synced up.
938            self.process_release(&gh, latest).await?;
939            return Ok((self, false));
940        }
941
942        info!("Pulling new vere releases");
943
944        // If we have a mismatch of the latest release, pull all releases, sort them by created_at
945        // attribute, and update our ID.
946
947        let mut out_rel = vec![];
948
949        for i in 0u32.. {
950            // Pull 10 releases at a time, to not hit timeout conditions
951            let Ok(list) = releases.list().per_page(5).page(i).send().await else {
952                break;
953            };
954
955            if list.items.is_empty() {
956                break;
957            }
958
959            let incomplete = list.incomplete_results == Some(true);
960
961            let mut hit_latest = false;
962
963            out_rel.extend(
964                list.into_iter()
965                    .filter(|v| !v.draft && !v.prerelease)
966                    .inspect(|v| hit_latest = hit_latest || Some(v.id) == self.latest_release),
967            );
968
969            // We hit the latest release we have at the moment. We assume further releases will be
970            // old ones, so that we do not have to
971            if hit_latest {
972                break;
973            }
974
975            if incomplete {
976                return Err(anyhow!(
977                    "Got incomplete results, we do not support that at the moment"
978                )
979                .into());
980            }
981        }
982
983        out_rel.sort_by_key(|v| v.created_at);
984
985        for release in out_rel {
986            debug!("Process {}", release.tag_name);
987            self.process_release(&gh, release).await?;
988        }
989
990        Ok((self, true))
991    }
992
993    pub async fn save(&self, path: &Path) -> Result<()> {
994        let data = serde_json::to_string_pretty(self)?;
995        fs::write(path, data.as_bytes()).await?;
996        Ok(())
997    }
998}
999
1000/// Returns (is_subpath, has_subcomponents, in_subpath)
1001fn is_subpath(in_path: &Path, target_path: &Path) -> Option<(bool, bool)> {
1002    let mut in_components = in_path.components().skip_while(|c| c == &Component::CurDir);
1003
1004    let target_components = target_path
1005        .components()
1006        .skip_while(|c| c == &Component::CurDir);
1007
1008    let mut in_subpath = false;
1009
1010    for target in target_components {
1011        // We may need to get 2 input components out given 1 target component.
1012        loop {
1013            let Some(inp) = in_components.next() else {
1014                return None;
1015            };
1016
1017            if target != inp {
1018                if in_subpath {
1019                    return None;
1020                } else {
1021                    in_subpath = true;
1022                    continue;
1023                }
1024            }
1025
1026            // Intentionally break in all cases except 1 - the control flow is easier to reason
1027            // about this way.
1028            break;
1029        }
1030    }
1031
1032    Some((in_components.next().is_some(), in_subpath))
1033}
1034
1035fn is_path(in_path: &Path, target_path: &Path) -> (bool, bool) {
1036    is_subpath(in_path, target_path)
1037        .map(|(a, b)| if !a { (true, b) } else { (false, false) })
1038        .unwrap_or_default()
1039}
1040
1041fn is_vere(in_path: &Path) -> (bool, bool) {
1042    is_path(in_path, Path::new(".run"))
1043}
1044
1045fn is_allowed_file(in_path: &Path, prt_extensions: &[impl AsRef<str>]) -> (bool, bool) {
1046    for f in [".bin/pace", ".run", ".prt/info.json"] {
1047        if let Some((false, in_subpath)) = is_subpath(in_path, Path::new(f)) {
1048            return (true, in_subpath);
1049        }
1050    }
1051
1052    // Exclude any of the following, because we simply don't need them
1053    for d in [".urb/get", ".urb/put", ".urb/roc"] {
1054        if is_subpath(in_path, Path::new(d)).is_some() {
1055            return (false, false);
1056        }
1057    }
1058
1059    for d in [".urb"] {
1060        if let Some((true, in_subpath)) = is_subpath(in_path, Path::new(d)) {
1061            return (true, in_subpath);
1062        }
1063    }
1064
1065    for ext in prt_extensions {
1066        if let Some((true, in_subpath)) = is_subpath(in_path, &Path::new(".prt").join(ext.as_ref()))
1067        {
1068            return (true, in_subpath);
1069        }
1070    }
1071
1072    (false, false)
1073}
1074
1075/// Filters files on a zstd stream, and outputs them to given function
1076pub async fn import_zstd_stream<I: AsyncRead + Send>(
1077    stream_in: I,
1078    file_out: &mut (impl AsyncUnpack + ?Sized),
1079    prt_extensions: &[impl AsRef<str>],
1080) -> Result<UrbitPier> {
1081    import_zstd_stream_with_vere(stream_in, file_out, prt_extensions, false).await
1082}
1083
1084/// Filters files on a zstd stream, and outputs them to given function
1085pub async fn import_zstd_stream_with_vere<I: AsyncRead + Send>(
1086    stream_in: I,
1087    file_out: &mut (impl AsyncUnpack + ?Sized),
1088    prt_extensions: &[impl AsRef<str>],
1089    unpack_vere: bool,
1090) -> Result<UrbitPier> {
1091    let stream_in = BufReader::new(stream_in);
1092    let mut stream_in = ZstdDecoder::new(stream_in);
1093    let stream_in = pin!(stream_in);
1094    let ar = async_tar::Archive::new(stream_in);
1095    let mut entries = ar.entries()?;
1096
1097    // This allows us to be consistent with parsing paths from `./<patp>/` and `./`
1098    let mut subpath_mode = None;
1099
1100    let mut pier = UrbitPier::default();
1101
1102    while let Some(entry) = entries.next().await {
1103        let mut entry = entry?;
1104
1105        trace!("ZSTD {entry:?}");
1106
1107        if !matches!(
1108            entry.header().entry_type(),
1109            EntryType::Regular
1110                | EntryType::Continuous
1111                | EntryType::GNULongName
1112                | EntryType::GNUSparse
1113        ) {
1114            continue;
1115        }
1116
1117        let path: PathBuf = (*entry.path()?).into();
1118        let size = entry.header().size()?;
1119
1120        let (is_vere, in_subpath) = is_vere(&path);
1121
1122        trace!("ZSTD: {path:?} is_vere={is_vere} in_subpath={in_subpath}");
1123
1124        if is_vere {
1125            if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1126                warn!(
1127                    "Subpath mode does not match ({in_subpath} vs. {})",
1128                    subpath_mode.unwrap()
1129                );
1130                continue;
1131            } else {
1132                subpath_mode = Some(in_subpath);
1133
1134                if size > MAX_VERE_SIZE {
1135                    warn!("Vere too large ({size} bytes)",);
1136                    continue;
1137                }
1138
1139                // We need to read vere out, to compute its hash, and skip output, because we will
1140                // write vere properly later with correct architecture.
1141                let mut vere = vec![];
1142                entry.read_to_end(&mut vere).await?;
1143                pier.vere_hash = Some(sha256::digest(&vere));
1144
1145                if unpack_vere {
1146                    file_out
1147                        .unpack_exec(&path, in_subpath, pin!(Cursor::new(vere)))
1148                        .await?;
1149                }
1150
1151                continue;
1152            }
1153        }
1154
1155        let (is_allowed_file, in_subpath) = is_allowed_file(&path, prt_extensions);
1156
1157        if is_allowed_file {
1158            if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1159                warn!(
1160                    "Subpath mode does not match ({in_subpath} vs. {})",
1161                    subpath_mode.unwrap()
1162                );
1163                continue;
1164            } else {
1165                subpath_mode = Some(in_subpath);
1166
1167                if size > MAX_SIZE {
1168                    warn!("File too large ({size} bytes)",);
1169                    continue;
1170                }
1171
1172                file_out.unpack(&path, in_subpath, pin!(entry)).await?;
1173            }
1174        }
1175    }
1176
1177    if subpath_mode.is_some() {
1178        Ok(pier)
1179    } else {
1180        Err(anyhow!("Pier is empty").into())
1181    }
1182}
1183
1184/// Filters files on a zip file, and outputs them to a given function
1185///
1186/// Note that for zip to work, we need to have a seekable stream, i.e. file.
1187pub async fn import_zip_file<I: AsyncRead + AsyncSeek + Send>(
1188    stream_in: I,
1189    file_out: &mut (impl AsyncUnpack + ?Sized),
1190    prt_extensions: &[impl AsRef<str>],
1191) -> Result<UrbitPier> {
1192    let mut stream_in = BufReader::new(stream_in);
1193    let stream_in = pin!(stream_in);
1194    let mut zip = read::seek::ZipFileReader::new(stream_in).await?;
1195
1196    // This allows us to be consistent with parsing paths from `./<patp>/` and `./`
1197    let mut subpath_mode = None;
1198
1199    let mut pier = UrbitPier::default();
1200
1201    for i in 0.. {
1202        let Ok(mut file) = zip.reader_with_entry(i).await else {
1203            break;
1204        };
1205
1206        // This is not a file, we don't need it
1207        if matches!(file.entry().dir(), Ok(true)) {
1208            continue;
1209        }
1210
1211        let entry = file.entry();
1212
1213        let Ok(path) = entry.filename().as_str().map(PathBuf::from) else {
1214            continue;
1215        };
1216
1217        let (is_vere, in_subpath) = is_vere(&path);
1218
1219        if is_vere {
1220            if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1221                warn!(
1222                    "Subpath mode does not match ({in_subpath} vs. {})",
1223                    subpath_mode.unwrap()
1224                );
1225                continue;
1226            } else {
1227                subpath_mode = Some(in_subpath);
1228
1229                if file.entry().uncompressed_size() > MAX_VERE_SIZE {
1230                    warn!(
1231                        "Vere too large ({} bytes)",
1232                        file.entry().uncompressed_size()
1233                    );
1234                    continue;
1235                }
1236
1237                // We need to read vere out, to compute its hash, and skip output, because we will
1238                // write vere properly later with correct architecture.
1239                let mut vere = vec![];
1240                file.read_to_end_checked(&mut vere).await?;
1241                pier.vere_hash = Some(sha256::digest(&vere));
1242                continue;
1243            }
1244        }
1245
1246        let (is_allowed_file, in_subpath) = is_allowed_file(&path, prt_extensions);
1247
1248        if is_allowed_file {
1249            if subpath_mode.is_some() && Some(in_subpath) != subpath_mode {
1250                warn!(
1251                    "Subpath mode does not match ({in_subpath} vs. {})",
1252                    subpath_mode.unwrap()
1253                );
1254                continue;
1255            } else {
1256                subpath_mode = Some(in_subpath);
1257
1258                if file.entry().uncompressed_size() > MAX_SIZE {
1259                    warn!(
1260                        "File too large ({} bytes)",
1261                        file.entry().uncompressed_size()
1262                    );
1263                    continue;
1264                }
1265
1266                file_out.unpack(&path, in_subpath, pin!(file)).await?;
1267            }
1268        }
1269    }
1270
1271    Ok(pier)
1272}
1273
1274/// Compresses a pier directory using zstd compression algorithm.
1275pub async fn export_dir(path: &Path, stream_out: (impl AsyncWrite + Send + Sync)) -> Result<()> {
1276    let stream_out = BufWriter::new(stream_out);
1277    let stream_out = ZstdEncoder::new(stream_out);
1278    let stream_out = pin!(stream_out);
1279
1280    let mut ar = async_tar::Builder::new(stream_out);
1281    ar.append_dir_all(".", path).await?;
1282    ar.finish().await?;
1283    let mut zstd = ar.into_inner().await?;
1284    zstd.close().await?;
1285    zstd.get_pin_mut().flush().await?;
1286
1287    Ok(())
1288}
1289
1290pub fn current_os_arch() -> (&'static str, &'static str) {
1291    use std::env::consts::{ARCH, OS};
1292    (OS, ARCH)
1293}