Skip to main content

assemblyline_filestore/
filestore.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3
4use anyhow::{bail, Context, Result};
5use bytes::Bytes;
6use log::{info, warn};
7
8use crate::transport::Transport;
9use crate::transport::ftp::TransportFtp;
10use crate::transport::local::LocalTransport;
11use crate::transport::sftp::{SftpParameters, TransportSftp};
12
13/// An abstract interface over one or more storage transports.
14///
15/// Supports separate writable and read-only transport pools. Write operations
16/// (upload, put, delete) only target writable transports. Read operations
17/// (download, get, exists, stream) search both pools, with read-only transports
18/// used as a fallback for data that hasn't been migrated to the primary storage.
19#[derive(Debug)]
20pub struct FileStore {
21    /// Transports that support both read and write operations
22    transports: Vec<Box<dyn Transport>>,
23}
24
25
26impl FileStore {
27    /// Open all urls with default parameters
28    pub async fn open(urls: &[String]) -> Result<Arc<FileStore>> {
29        let mut transports = vec![];
30        for url in urls {
31            transports.push(Self::create_transport(url, None).await?)
32        }
33        info!("FileStore initialized with {} writable transport(s)", transports.len());
34        Ok(Arc::new(Self {
35            transports
36        }))
37    }
38
39    /// Open a single url with retrying disabled
40    pub async fn with_limit_retries(url: &str) -> Result<Arc<FileStore>> {
41        Ok(Arc::new(Self {
42            transports: vec![Self::create_transport(url, Some(1)).await?],
43        }))
44    }
45
46    async fn create_transport(address: &str, connection_attempts: Option<usize>) -> Result<Box<dyn Transport>> {
47        let url: url::Url = address.parse()?;
48
49        let mut base = match url.path() {
50            "" => "/",
51            other => other,
52        }.to_string();
53
54        let host = url.host_str();
55        if host == Some(".") {
56            base = format!(".{base}");
57        }
58        let port = url.port();
59
60        let password = url.password().map(|password|{
61            percent_encoding::percent_decode_str(password).decode_utf8_lossy().to_string()
62        });
63
64        let read_only = url.query_pairs().find_map(|(name, value)| {
65            if name == "read_only" {
66                Some(read_bool(&value))
67            } else {
68                None
69            }
70        }).unwrap_or(false);
71
72        // user = parsed.username or ''
73
74
75        match url.scheme() {
76            "file" => {
77                if url.has_host() {
78                    bail!("Local file connections can't specify a host.");
79                }
80
81                // for (name, value) in url.query_pairs() {
82
83                // }
84                // valid_bool_keys = ['normalize']
85                // extras = _get_extras(parse_qs(parsed.query), valid_bool_keys=valid_bool_keys)
86
87                // t = TransportLocal(base=base, **extras)
88
89                let path = base.parse()?;
90                Ok(Box::new(LocalTransport::new(path, read_only)))
91            }
92            "azure" => {
93                use crate::transport::azure::{AzureParameters, TransportAzure};
94                let mut parameters = AzureParameters::default();
95                for (name, value) in url.query_pairs() {
96                    match name.as_ref() {
97                        "allow_directory_access" => parameters.allow_directory_access = read_bool(&value),
98                        "use_default_credentials" => parameters.use_default_credentials = read_bool(&value),
99                        "emulator" => parameters.emulator = read_bool(&value),
100                        "access_key" => parameters.access_key = value.to_string(),
101                        "tenant_id" => parameters.tenant_id = value.to_string(),
102                        "client_id" => parameters.client_id = value.to_string(),
103                        "client_secret" => parameters.client_secret = value.to_string(),
104                        _ => {}
105                    }
106                }
107
108                // t = TransportAzure(base=base, host=host, connection_attempts=connection_attempts, **extras)
109                let host = match url.host_str() {
110                    Some(host) => host.to_owned(),
111                    None => bail!("a host must be provided for azure connections"),
112                };
113                let base = url.path().to_owned();
114
115                Ok(Box::new(TransportAzure::new(host, base, parameters, connection_attempts, read_only).await?))
116            },
117            "s3" => {
118                use crate::transport::s3::{S3Parameters, TransportS3};
119                let mut parameters = S3Parameters::default();
120                for (name, value) in url.query_pairs() {
121                    match name.as_ref() {
122                        "use_ssl" => parameters.use_ssl = read_bool(&value),
123                        "verify" => parameters.verify = read_bool(&value),
124                        "boto_defaults" => parameters.boto_defaults = read_bool(&value),
125                        "aws_region" => parameters.aws_region = Some(value.to_string()),
126                        "s3_bucket" => parameters.s3_bucket = value.to_string(),
127                        "compatability" => parameters.compatability = read_bool(&value),
128                        _ => {}
129                    }
130                }
131
132                // If user/password not specified, access might be dictated by IAM roles
133                let user = match url.username() {
134                    "" => None,
135                    value => Some(value.to_owned())
136                };
137
138                Ok(Box::new(TransportS3::new(base, host.map(str::to_string), port, user, password, connection_attempts, parameters, read_only).await?))
139            }
140            "ftp" | "ftps" => {
141                let host = match host {
142                    Some(host) => host.to_owned(),
143                    None => bail!("A host must be provided for ftp transport")
144                };
145
146                let user = match url.username() {
147                    "" => None,
148                    value => Some(value.to_owned())
149                };
150
151                if url.scheme().eq_ignore_ascii_case("ftps") {
152                    Ok(Box::new(TransportFtp::new_secure(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
153                } else {
154                    Ok(Box::new(TransportFtp::new(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
155                }
156            }
157            "sftp" => {
158                let host = match host {
159                    Some(host) => host.to_owned(),
160                    None => bail!("A host must be provided for ftp transport")
161                };
162                let user = url.username().to_owned();
163
164                let mut params = SftpParameters::default();
165                for (name, value) in url.query_pairs() {
166                    match name.as_ref() {
167                        "private_key" => params.private_key = Some(value.to_string()),
168                        "private_key_password" => params.private_key_password = Some(value.to_string()),
169                        "validate_host" => params.validate_host = read_bool(&value),
170                        _ => {}
171                    }
172                }
173
174                Ok(Box::new(TransportSftp::new(base, host, password, user, port.unwrap_or(22), connection_attempts, params, read_only).await?))
175            }
176            _ => {
177                bail!("Not an accepted filestore scheme: {}", url.scheme());
178            }
179        }
180    }
181
182    /// Upload a buffer to all writable transports.
183    /// Read-only transports are skipped.
184    pub async fn put(&self, name: &str, body: &Bytes) -> Result<()> {
185        for transport in &self.transports {
186            if transport.read_only() {
187                // Skip on read-only transports
188                continue;
189            }
190            transport.put(name, body).await?;
191        }
192        Ok(())
193    }
194
195    /// Check if a given blob is defined in any transport (writable or read-only).
196    /// Errors will be supressed as long as any transport contains the file.
197    pub async fn exists(&self, name: &str) -> Result<bool> {
198        let mut last_error = None;
199        for transport in &self.transports {
200            match transport.exists(name).await {
201                Ok(true) => return Ok(true),
202                Ok(false) => continue,
203                Err(err) => {
204                    last_error = Some(err);
205                    continue
206                },
207            }
208        }
209        if let Some(error) = last_error {
210            return Err(error).context("Transport errors");
211        }
212        return Ok(false)
213    }
214
215    /// Pull blob to in memory buffer from any transport (writable or read-only).
216    /// Returns errors only if all transports fail, otherwise errors will be logged as warnings.
217    pub async fn get(&self, name: &str) -> Result<Option<Vec<u8>>> {
218        let mut last_error = None;
219        for transport in &self.transports {
220            match transport.get(name).await {
221                Ok(bytes) => {
222                    return Ok(bytes)
223                },
224                Err(err) => {
225                    warn!("error fetching blob [{name}] from transport {transport:?}: {err}");
226                    last_error = Some(err);
227                    continue
228                },
229            }
230        }
231        match last_error {
232            Some(error) => Err(error).context("All transports failed to fetch"),
233            None => Ok(None)
234        }
235    }
236
237    /// Download a blob and write it to a local file from any transport (writable or read-only).
238    /// If the file does not exist it will be created. If it does exist it will be replaced.
239    pub async fn download(&self, name: &str, path: &Path) -> Result<()> {
240        let mut errors = vec![];
241        for transport in &self.transports {
242            match transport.download(name, path).await {
243                Ok(()) => {
244                    return Ok(())
245                },
246                Err(err) => {
247                    errors.push(format!("Could not download file: [{name}] from {transport:?}: {err}"));
248                    continue
249                },
250            }
251        }
252        if errors.is_empty() {
253            bail!("All transports failed to fetch [{name}]")
254        }
255        Err(anyhow::anyhow!(errors.join("\n")).context("All transports failed to fetch"))
256    }
257
258    /// Upload a local file as a named blob to writable transports only.
259    /// Read-only transports are skipped.
260    pub async fn upload(&self, path: &Path, name: &str) -> Result<()> {
261        let mut last_error = None;
262        for transport in &self.transports {
263            if transport.read_only() {
264                // Skip on read-only transports
265                continue;
266            }
267
268            if let Err(err) = transport.upload(path, name).await {
269                last_error = Some(err);
270            }
271        }
272        match last_error {
273            Some(error) => Err(error).context("A transport failed to upload"),
274            None => Ok(())
275        }
276    }
277
278    /// Upload a collection of local files to writable transports only.
279    pub async fn upload_batch(&self, local_remote_tuples: &[(&Path, &str)]) -> Vec<(PathBuf, String, String)> {
280        let mut failed_tuples = vec![];
281        for (src_path, dst_path) in local_remote_tuples {
282            if let Err(error) = self.upload(src_path, dst_path).await {
283                failed_tuples.push((src_path.to_path_buf(), dst_path.to_string(), format!("{error:?}")));
284            }
285        }
286        return failed_tuples
287    }
288
289    /// Stream the content of a blob from any transport (writable or read-only).
290    /// Returns the total expected length of the stream and a message receiver of data buffers.
291    pub async fn stream(&self, name: &str) -> Result<(u64, tokio::sync::mpsc::Receiver<Result<Bytes, std::io::Error>>)> {
292        let mut last_error = None;
293        for transport in &self.transports {
294            match transport.stream(name).await {
295                Ok((size, stream)) => return Ok((size, stream)),
296                Err(err) => last_error = Some(err),
297            }
298        }
299        match last_error {
300            Some(err) => Err(err),
301            None => bail!("No transports could stream file"),
302        }
303    }
304
305    /// Remove a blob from writable storage only.
306    /// Read-only transports are skipped.
307    pub async fn delete(&self, name: &str) -> Result<()> {
308        for transport in &self.transports {
309            if transport.read_only() {
310                // Skip on read-only transports
311                continue;
312            }
313            transport.delete(name).await?;
314        }
315        Ok(())
316    }
317}
318
319fn read_bool(value: &str) -> bool {
320    matches!(value.to_ascii_lowercase().as_str(), "true" | "1")
321}