fluentci_ext/cache/
mod.rs

1use std::{
2    env,
3    fs::{self, metadata},
4    path::Path,
5    process::{Command, ExitStatus, Stdio},
6    sync::mpsc::{self, Sender},
7    thread,
8};
9
10use crate::{exec, pkgx::Pkgx, Extension};
11use anyhow::Error;
12use async_trait::async_trait;
13use fluentci_types::Output;
14use owo_colors::OwoColorize;
15
16pub mod gcs;
17pub mod s3;
18
19pub enum CacheBackendType {
20    CDN,
21    S3,
22    GCS,
23}
24
25#[async_trait]
26pub trait CacheBackend {
27    async fn restore(&self, path: &str) -> Result<(), Error>;
28    async fn save(&self, path: &str) -> Result<(), Error>;
29}
30
31#[derive(Default)]
32pub struct Cache {
33    key: String,
34    path: String,
35}
36
37pub fn detect_cache_backend() -> Option<CacheBackendType> {
38    if let Ok(_) = std::env::var("FLUENTCI_CACHE_CDN_ENDPOINT") {
39        return Some(CacheBackendType::CDN);
40    }
41
42    if let Ok(_) = std::env::var("FLUENTCI_CACHE_S3_ENDPOINT") {
43        return Some(CacheBackendType::S3);
44    }
45
46    if let Ok(_) = std::env::var("FLUENTCI_CACHE_GCS_BUCKET") {
47        return Some(CacheBackendType::GCS);
48    }
49
50    None
51}
52
53pub async fn download(key: &str) -> Result<(), Error> {
54    let default_backend: Option<CacheBackendType> = detect_cache_backend();
55
56    if default_backend.is_none() {
57        println!("-> No cache backend found, skipping download");
58        return Ok(());
59    }
60
61    let cache_file = format!(
62        "{}/.fluentci/cache/{}.tar.gz",
63        dirs::home_dir().unwrap().to_str().unwrap(),
64        key
65    );
66
67    if Path::new(&cache_file).exists() {
68        println!("-> Cache already exists, skipping download");
69        return Ok(());
70    }
71
72    println!(
73        " {} cache for key: {} ...",
74        "Downloading".bright_green(),
75        key.bright_yellow()
76    );
77
78    if let Some(CacheBackendType::CDN) = default_backend {
79        fs::create_dir_all(format!(
80            "{}/.fluentci/cache",
81            dirs::home_dir().unwrap().to_str().unwrap()
82        ))?;
83
84        let prefix = format!("{}/{}", env::consts::OS, env::consts::ARCH);
85        let url = std::env::var("FLUENTCI_CACHE_CDN_ENDPOINT")?;
86        let filename = cache_file.split("/").last().unwrap();
87        let url = format!("{}/{}/{}", url, prefix, filename);
88        let cmd = format!("wget {} -O {}", url, cache_file);
89        println!("-> {}", cmd.bright_cyan());
90
91        let mut child = Command::new("bash")
92            .arg("-c")
93            .arg(&cmd)
94            .stdout(Stdio::inherit())
95            .stderr(Stdio::inherit())
96            .spawn()?;
97
98        child.wait()?;
99
100        return Ok(());
101    }
102
103    let backend: Box<dyn CacheBackend> = match default_backend {
104        Some(CacheBackendType::S3) => Box::new(s3::new().await?),
105        Some(CacheBackendType::GCS) => Box::new(gcs::new().await?),
106        _ => Box::new(s3::new().await?),
107    };
108    backend.restore(&cache_file).await?;
109    Ok(())
110}
111
112pub async fn upload(key: &str) -> Result<(), Error> {
113    let mut default_backend: Option<&str> = None;
114
115    if let Ok(_) = std::env::var("FLUENTCI_CACHE_S3_ENDPOINT") {
116        default_backend = Some("S3");
117    }
118
119    if let Ok(_) = std::env::var("FLUENTCI_CACHE_GCS_ENDPOINT") {
120        default_backend = Some("GCS");
121    }
122
123    if default_backend.is_none() {
124        println!("-> No cache backend found, skipping upload");
125        return Ok(());
126    }
127
128    println!(
129        " {} cache for key: {} ...",
130        "Uploading".bright_green(),
131        key.bright_yellow()
132    );
133    let cache_file = format!(
134        "{}/.fluentci/cache/{}.tar.gz",
135        dirs::home_dir().unwrap().to_str().unwrap(),
136        key
137    );
138    let backend: Box<dyn CacheBackend> = match default_backend {
139        Some("S3") => Box::new(s3::new().await?),
140        Some("GCS") => Box::new(gcs::new().await?),
141        _ => Box::new(s3::new().await?),
142    };
143    backend.save(&cache_file).await?;
144    Ok(())
145}
146
147impl Extension for Cache {
148    fn exec(
149        &mut self,
150        key: &str,
151        tx: Sender<String>,
152        _out: Output,
153        _last_cmd: bool,
154        work_dir: &str,
155    ) -> Result<ExitStatus, Error> {
156        let key_path = key.split(":").collect::<Vec<&str>>();
157
158        if key_path.len() != 2 {
159            return Err(Error::msg("Invalid cache key"));
160        }
161
162        self.key = key_path[0].to_string();
163        self.path = match key_path[1].starts_with("/") {
164            true => key_path[1].to_string(),
165            false => format!("{}/{}", work_dir, key_path[1]),
166        };
167
168        println!("Cache key: {}", key);
169
170        let cache_file = format!(
171            "{}/.fluentci/cache/{}.tar.gz",
172            dirs::home_dir().unwrap().to_str().unwrap(),
173            self.key
174        );
175
176        if !Path::new(&cache_file).exists() {
177            let key = self.key.clone();
178            let (dtx, drx) = mpsc::channel();
179            thread::spawn(move || {
180                let rt = tokio::runtime::Runtime::new().unwrap();
181                rt.block_on(download(&key)).unwrap();
182                dtx.send(0).unwrap();
183            });
184            drx.recv().unwrap();
185        }
186
187        if Path::new(&cache_file).exists() {
188            let parent = Path::new(&self.path).parent().unwrap();
189            let work_dir = String::from(parent.to_str().unwrap());
190
191            fs::create_dir_all(&work_dir)?;
192
193            let cmd = format!("tar xzvf {}", cache_file);
194            exec(&cmd, tx, Output::Stdout, true, &work_dir)?;
195            let label = format!("[{}]", "withCache");
196            println!("{} Cached restored", label.cyan());
197            return Ok(ExitStatus::default());
198        }
199
200        tx.send("Cached restored".to_string())?;
201        Ok(ExitStatus::default())
202    }
203
204    fn setup(&self) -> Result<(), Error> {
205        Pkgx::default().install(vec!["tar", "wget"])?;
206        Ok(())
207    }
208
209    fn post_setup(&self, tx: Sender<String>) -> Result<ExitStatus, Error> {
210        println!("Cache key: {}", self.key);
211        println!("Cache path: {}", self.path);
212
213        fs::create_dir_all(format!(
214            "{}/.fluentci/cache",
215            dirs::home_dir().unwrap().to_str().unwrap()
216        ))?;
217
218        let output_file = format!(
219            "{}/.fluentci/cache/{}.tar.gz",
220            dirs::home_dir().unwrap().to_str().unwrap(),
221            self.key
222        );
223
224        if let Some(parent) = Path::new(&self.path).parent() {
225            let (is_dir, is_file) =
226                metadata(&self.path).map(|metadata| (metadata.is_dir(), metadata.is_file()))?;
227
228            if let Some(file_or_dir) = Path::new(&self.path).file_name() {
229                let cmd = match (is_dir, is_file) {
230                    (true, false) => {
231                        format!("tar czvf {} {}", output_file, file_or_dir.to_str().unwrap())
232                    }
233                    (false, true) => {
234                        format!("tar czvf {} {}", output_file, file_or_dir.to_str().unwrap())
235                    }
236                    _ => return Err(Error::msg("Invalid file or directory")),
237                };
238
239                let work_dir = String::from(parent.to_str().unwrap());
240
241                exec(&cmd, tx, Output::Stdout, true, &work_dir)?;
242
243                let key = self.key.clone();
244                let (utx, urx) = mpsc::channel();
245                thread::spawn(move || {
246                    let rt = tokio::runtime::Runtime::new().unwrap();
247                    rt.block_on(upload(&key)).unwrap();
248                    utx.send(0).unwrap();
249                });
250                urx.recv().unwrap();
251
252                return Ok(ExitStatus::default());
253            }
254            return Err(Error::msg("Failed to get file or directory name"));
255        }
256
257        Err(Error::msg("Failed to get parent directory"))
258    }
259}