1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use anyhow::{bail, Context, Result};
5use bytes::Bytes;
6use log::{info, warn};
7
8use crate::transport::Transport;
9use crate::transport::ftp::TransportFtp;
10use crate::transport::local::LocalTransport;
11use crate::transport::sftp::{SftpParameters, TransportSftp};
12
13#[derive(Debug)]
20pub struct FileStore {
21 transports: Vec<Box<dyn Transport>>,
23}
24
25
26impl FileStore {
27 pub async fn open(urls: &[String]) -> Result<Arc<FileStore>> {
29 let mut transports = vec![];
30 for url in urls {
31 transports.push(Self::create_transport(url, None).await?)
32 }
33 info!("FileStore initialized with {} writable transport(s)", transports.len());
34 Ok(Arc::new(Self {
35 transports
36 }))
37 }
38
39 pub async fn with_limit_retries(url: &str) -> Result<Arc<FileStore>> {
41 Ok(Arc::new(Self {
42 transports: vec![Self::create_transport(url, Some(1)).await?],
43 }))
44 }
45
46 async fn create_transport(address: &str, connection_attempts: Option<usize>) -> Result<Box<dyn Transport>> {
47 let url: url::Url = address.parse()?;
48
49 let mut base = match url.path() {
50 "" => "/",
51 other => other,
52 }.to_string();
53
54 let host = url.host_str();
55 if host == Some(".") {
56 base = format!(".{base}");
57 }
58 let port = url.port();
59
60 let password = url.password().map(|password|{
61 percent_encoding::percent_decode_str(password).decode_utf8_lossy().to_string()
62 });
63
64 let read_only = url.query_pairs().find_map(|(name, value)| {
65 if name == "read_only" {
66 Some(read_bool(&value))
67 } else {
68 None
69 }
70 }).unwrap_or(false);
71
72 match url.scheme() {
76 "file" => {
77 if url.has_host() {
78 bail!("Local file connections can't specify a host.");
79 }
80
81 let path = base.parse()?;
90 Ok(Box::new(LocalTransport::new(path, read_only)))
91 }
92 "azure" => {
93 use crate::transport::azure::{AzureParameters, TransportAzure};
94 let mut parameters = AzureParameters::default();
95 for (name, value) in url.query_pairs() {
96 match name.as_ref() {
97 "allow_directory_access" => parameters.allow_directory_access = read_bool(&value),
98 "use_default_credentials" => parameters.use_default_credentials = read_bool(&value),
99 "emulator" => parameters.emulator = read_bool(&value),
100 "access_key" => parameters.access_key = value.to_string(),
101 "tenant_id" => parameters.tenant_id = value.to_string(),
102 "client_id" => parameters.client_id = value.to_string(),
103 "client_secret" => parameters.client_secret = value.to_string(),
104 _ => {}
105 }
106 }
107
108 let host = match url.host_str() {
110 Some(host) => host.to_owned(),
111 None => bail!("a host must be provided for azure connections"),
112 };
113 let base = url.path().to_owned();
114
115 Ok(Box::new(TransportAzure::new(host, base, parameters, connection_attempts, read_only).await?))
116 },
117 "s3" => {
118 use crate::transport::s3::{S3Parameters, TransportS3};
119 let mut parameters = S3Parameters::default();
120 for (name, value) in url.query_pairs() {
121 match name.as_ref() {
122 "use_ssl" => parameters.use_ssl = read_bool(&value),
123 "verify" => parameters.verify = read_bool(&value),
124 "boto_defaults" => parameters.boto_defaults = read_bool(&value),
125 "aws_region" => parameters.aws_region = Some(value.to_string()),
126 "s3_bucket" => parameters.s3_bucket = value.to_string(),
127 "compatability" => parameters.compatability = read_bool(&value),
128 _ => {}
129 }
130 }
131
132 let user = match url.username() {
134 "" => None,
135 value => Some(value.to_owned())
136 };
137
138 Ok(Box::new(TransportS3::new(base, host.map(str::to_string), port, user, password, connection_attempts, parameters, read_only).await?))
139 }
140 "ftp" | "ftps" => {
141 let host = match host {
142 Some(host) => host.to_owned(),
143 None => bail!("A host must be provided for ftp transport")
144 };
145
146 let user = match url.username() {
147 "" => None,
148 value => Some(value.to_owned())
149 };
150
151 if url.scheme().eq_ignore_ascii_case("ftps") {
152 Ok(Box::new(TransportFtp::new_secure(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
153 } else {
154 Ok(Box::new(TransportFtp::new(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
155 }
156 }
157 "sftp" => {
158 let host = match host {
159 Some(host) => host.to_owned(),
160 None => bail!("A host must be provided for ftp transport")
161 };
162 let user = url.username().to_owned();
163
164 let mut params = SftpParameters::default();
165 for (name, value) in url.query_pairs() {
166 match name.as_ref() {
167 "private_key" => params.private_key = Some(value.to_string()),
168 "private_key_password" => params.private_key_password = Some(value.to_string()),
169 "validate_host" => params.validate_host = read_bool(&value),
170 _ => {}
171 }
172 }
173
174 Ok(Box::new(TransportSftp::new(base, host, password, user, port.unwrap_or(22), connection_attempts, params, read_only).await?))
175 }
176 _ => {
177 bail!("Not an accepted filestore scheme: {}", url.scheme());
178 }
179 }
180 }
181
182 pub async fn put(&self, name: &str, body: &Bytes) -> Result<()> {
185 for transport in &self.transports {
186 if transport.read_only() {
187 continue;
189 }
190 transport.put(name, body).await?;
191 }
192 Ok(())
193 }
194
195 pub async fn exists(&self, name: &str) -> Result<bool> {
198 let mut last_error = None;
199 for transport in &self.transports {
200 match transport.exists(name).await {
201 Ok(true) => return Ok(true),
202 Ok(false) => continue,
203 Err(err) => {
204 last_error = Some(err);
205 continue
206 },
207 }
208 }
209 if let Some(error) = last_error {
210 return Err(error).context("Transport errors");
211 }
212 return Ok(false)
213 }
214
215 pub async fn get(&self, name: &str) -> Result<Option<Vec<u8>>> {
218 let mut last_error = None;
219 for transport in &self.transports {
220 match transport.get(name).await {
221 Ok(bytes) => {
222 return Ok(bytes)
223 },
224 Err(err) => {
225 warn!("error fetching blob [{name}] from transport {transport:?}: {err}");
226 last_error = Some(err);
227 continue
228 },
229 }
230 }
231 match last_error {
232 Some(error) => Err(error).context("All transports failed to fetch"),
233 None => Ok(None)
234 }
235 }
236
237 pub async fn download(&self, name: &str, path: &Path) -> Result<()> {
240 let mut errors = vec![];
241 for transport in &self.transports {
242 match transport.download(name, path).await {
243 Ok(()) => {
244 return Ok(())
245 },
246 Err(err) => {
247 errors.push(format!("Could not download file: [{name}] from {transport:?}: {err}"));
248 continue
249 },
250 }
251 }
252 if errors.is_empty() {
253 bail!("All transports failed to fetch [{name}]")
254 }
255 Err(anyhow::anyhow!(errors.join("\n")).context("All transports failed to fetch"))
256 }
257
258 pub async fn upload(&self, path: &Path, name: &str) -> Result<()> {
261 let mut last_error = None;
262 for transport in &self.transports {
263 if transport.read_only() {
264 continue;
266 }
267
268 if let Err(err) = transport.upload(path, name).await {
269 last_error = Some(err);
270 }
271 }
272 match last_error {
273 Some(error) => Err(error).context("A transport failed to upload"),
274 None => Ok(())
275 }
276 }
277
278 pub async fn upload_batch(&self, local_remote_tuples: &[(&Path, &str)]) -> Vec<(PathBuf, String, String)> {
280 let mut failed_tuples = vec![];
281 for (src_path, dst_path) in local_remote_tuples {
282 if let Err(error) = self.upload(src_path, dst_path).await {
283 failed_tuples.push((src_path.to_path_buf(), dst_path.to_string(), format!("{error:?}")));
284 }
285 }
286 return failed_tuples
287 }
288
289 pub async fn stream(&self, name: &str) -> Result<(u64, tokio::sync::mpsc::Receiver<Result<Bytes, std::io::Error>>)> {
292 let mut last_error = None;
293 for transport in &self.transports {
294 match transport.stream(name).await {
295 Ok((size, stream)) => return Ok((size, stream)),
296 Err(err) => last_error = Some(err),
297 }
298 }
299 match last_error {
300 Some(err) => Err(err),
301 None => bail!("No transports could stream file"),
302 }
303 }
304
305 pub async fn delete(&self, name: &str) -> Result<()> {
308 for transport in &self.transports {
309 if transport.read_only() {
310 continue;
312 }
313 transport.delete(name).await?;
314 }
315 Ok(())
316 }
317}
318
319fn read_bool(value: &str) -> bool {
320 matches!(value.to_ascii_lowercase().as_str(), "true" | "1")
321}