1use std::{
2 env,
3 fs::{self, metadata},
4 path::Path,
5 process::{Command, ExitStatus, Stdio},
6 sync::mpsc::{self, Sender},
7 thread,
8};
9
10use crate::{exec, pkgx::Pkgx, Extension};
11use anyhow::Error;
12use async_trait::async_trait;
13use fluentci_types::Output;
14use owo_colors::OwoColorize;
15
16pub mod gcs;
17pub mod s3;
18
19pub enum CacheBackendType {
20 CDN,
21 S3,
22 GCS,
23}
24
25#[async_trait]
26pub trait CacheBackend {
27 async fn restore(&self, path: &str) -> Result<(), Error>;
28 async fn save(&self, path: &str) -> Result<(), Error>;
29}
30
31#[derive(Default)]
32pub struct Cache {
33 key: String,
34 path: String,
35}
36
37pub fn detect_cache_backend() -> Option<CacheBackendType> {
38 if let Ok(_) = std::env::var("FLUENTCI_CACHE_CDN_ENDPOINT") {
39 return Some(CacheBackendType::CDN);
40 }
41
42 if let Ok(_) = std::env::var("FLUENTCI_CACHE_S3_ENDPOINT") {
43 return Some(CacheBackendType::S3);
44 }
45
46 if let Ok(_) = std::env::var("FLUENTCI_CACHE_GCS_BUCKET") {
47 return Some(CacheBackendType::GCS);
48 }
49
50 None
51}
52
53pub async fn download(key: &str) -> Result<(), Error> {
54 let default_backend: Option<CacheBackendType> = detect_cache_backend();
55
56 if default_backend.is_none() {
57 println!("-> No cache backend found, skipping download");
58 return Ok(());
59 }
60
61 let cache_file = format!(
62 "{}/.fluentci/cache/{}.tar.gz",
63 dirs::home_dir().unwrap().to_str().unwrap(),
64 key
65 );
66
67 if Path::new(&cache_file).exists() {
68 println!("-> Cache already exists, skipping download");
69 return Ok(());
70 }
71
72 println!(
73 " {} cache for key: {} ...",
74 "Downloading".bright_green(),
75 key.bright_yellow()
76 );
77
78 if let Some(CacheBackendType::CDN) = default_backend {
79 fs::create_dir_all(format!(
80 "{}/.fluentci/cache",
81 dirs::home_dir().unwrap().to_str().unwrap()
82 ))?;
83
84 let prefix = format!("{}/{}", env::consts::OS, env::consts::ARCH);
85 let url = std::env::var("FLUENTCI_CACHE_CDN_ENDPOINT")?;
86 let filename = cache_file.split("/").last().unwrap();
87 let url = format!("{}/{}/{}", url, prefix, filename);
88 let cmd = format!("wget {} -O {}", url, cache_file);
89 println!("-> {}", cmd.bright_cyan());
90
91 let mut child = Command::new("bash")
92 .arg("-c")
93 .arg(&cmd)
94 .stdout(Stdio::inherit())
95 .stderr(Stdio::inherit())
96 .spawn()?;
97
98 child.wait()?;
99
100 return Ok(());
101 }
102
103 let backend: Box<dyn CacheBackend> = match default_backend {
104 Some(CacheBackendType::S3) => Box::new(s3::new().await?),
105 Some(CacheBackendType::GCS) => Box::new(gcs::new().await?),
106 _ => Box::new(s3::new().await?),
107 };
108 backend.restore(&cache_file).await?;
109 Ok(())
110}
111
112pub async fn upload(key: &str) -> Result<(), Error> {
113 let mut default_backend: Option<&str> = None;
114
115 if let Ok(_) = std::env::var("FLUENTCI_CACHE_S3_ENDPOINT") {
116 default_backend = Some("S3");
117 }
118
119 if let Ok(_) = std::env::var("FLUENTCI_CACHE_GCS_ENDPOINT") {
120 default_backend = Some("GCS");
121 }
122
123 if default_backend.is_none() {
124 println!("-> No cache backend found, skipping upload");
125 return Ok(());
126 }
127
128 println!(
129 " {} cache for key: {} ...",
130 "Uploading".bright_green(),
131 key.bright_yellow()
132 );
133 let cache_file = format!(
134 "{}/.fluentci/cache/{}.tar.gz",
135 dirs::home_dir().unwrap().to_str().unwrap(),
136 key
137 );
138 let backend: Box<dyn CacheBackend> = match default_backend {
139 Some("S3") => Box::new(s3::new().await?),
140 Some("GCS") => Box::new(gcs::new().await?),
141 _ => Box::new(s3::new().await?),
142 };
143 backend.save(&cache_file).await?;
144 Ok(())
145}
146
147impl Extension for Cache {
148 fn exec(
149 &mut self,
150 key: &str,
151 tx: Sender<String>,
152 _out: Output,
153 _last_cmd: bool,
154 work_dir: &str,
155 ) -> Result<ExitStatus, Error> {
156 let key_path = key.split(":").collect::<Vec<&str>>();
157
158 if key_path.len() != 2 {
159 return Err(Error::msg("Invalid cache key"));
160 }
161
162 self.key = key_path[0].to_string();
163 self.path = match key_path[1].starts_with("/") {
164 true => key_path[1].to_string(),
165 false => format!("{}/{}", work_dir, key_path[1]),
166 };
167
168 println!("Cache key: {}", key);
169
170 let cache_file = format!(
171 "{}/.fluentci/cache/{}.tar.gz",
172 dirs::home_dir().unwrap().to_str().unwrap(),
173 self.key
174 );
175
176 if !Path::new(&cache_file).exists() {
177 let key = self.key.clone();
178 let (dtx, drx) = mpsc::channel();
179 thread::spawn(move || {
180 let rt = tokio::runtime::Runtime::new().unwrap();
181 rt.block_on(download(&key)).unwrap();
182 dtx.send(0).unwrap();
183 });
184 drx.recv().unwrap();
185 }
186
187 if Path::new(&cache_file).exists() {
188 let parent = Path::new(&self.path).parent().unwrap();
189 let work_dir = String::from(parent.to_str().unwrap());
190
191 fs::create_dir_all(&work_dir)?;
192
193 let cmd = format!("tar xzvf {}", cache_file);
194 exec(&cmd, tx, Output::Stdout, true, &work_dir)?;
195 let label = format!("[{}]", "withCache");
196 println!("{} Cached restored", label.cyan());
197 return Ok(ExitStatus::default());
198 }
199
200 tx.send("Cached restored".to_string())?;
201 Ok(ExitStatus::default())
202 }
203
204 fn setup(&self) -> Result<(), Error> {
205 Pkgx::default().install(vec!["tar", "wget"])?;
206 Ok(())
207 }
208
209 fn post_setup(&self, tx: Sender<String>) -> Result<ExitStatus, Error> {
210 println!("Cache key: {}", self.key);
211 println!("Cache path: {}", self.path);
212
213 fs::create_dir_all(format!(
214 "{}/.fluentci/cache",
215 dirs::home_dir().unwrap().to_str().unwrap()
216 ))?;
217
218 let output_file = format!(
219 "{}/.fluentci/cache/{}.tar.gz",
220 dirs::home_dir().unwrap().to_str().unwrap(),
221 self.key
222 );
223
224 if let Some(parent) = Path::new(&self.path).parent() {
225 let (is_dir, is_file) =
226 metadata(&self.path).map(|metadata| (metadata.is_dir(), metadata.is_file()))?;
227
228 if let Some(file_or_dir) = Path::new(&self.path).file_name() {
229 let cmd = match (is_dir, is_file) {
230 (true, false) => {
231 format!("tar czvf {} {}", output_file, file_or_dir.to_str().unwrap())
232 }
233 (false, true) => {
234 format!("tar czvf {} {}", output_file, file_or_dir.to_str().unwrap())
235 }
236 _ => return Err(Error::msg("Invalid file or directory")),
237 };
238
239 let work_dir = String::from(parent.to_str().unwrap());
240
241 exec(&cmd, tx, Output::Stdout, true, &work_dir)?;
242
243 let key = self.key.clone();
244 let (utx, urx) = mpsc::channel();
245 thread::spawn(move || {
246 let rt = tokio::runtime::Runtime::new().unwrap();
247 rt.block_on(upload(&key)).unwrap();
248 utx.send(0).unwrap();
249 });
250 urx.recv().unwrap();
251
252 return Ok(ExitStatus::default());
253 }
254 return Err(Error::msg("Failed to get file or directory name"));
255 }
256
257 Err(Error::msg("Failed to get parent directory"))
258 }
259}