Skip to main content

assemblyline_filestore/
lib.rs

1//! A library that provides adaptors to blob storage for the assemblyline platform.
2#![warn(missing_docs, non_ascii_idents, trivial_numeric_casts,
3    unused_crate_dependencies, noop_method_call, single_use_lifetimes, trivial_casts,
4    unused_lifetimes, nonstandard_style, variant_size_differences)]
5#![deny(keyword_idents)]
6// #![warn(clippy::missing_docs_in_private_items)]
7#![allow(
8    // allow these as they sometimes improve clarity
9    clippy::needless_return,
10    clippy::collapsible_else_if
11)]
12
13mod filestore;
14mod transport;
15pub mod errors;
16
17pub use filestore::FileStore;
18
19#[cfg(test)]
20mod tests;
21
22
23// from __future__ import annotations
24
25// import json
26// import logging
27// from typing import TYPE_CHECKING, AnyStr, Optional, Tuple
28// from urllib.parse import parse_qs, unquote, urlparse
29
30// import elasticapm
31// from assemblyline.common.exceptions import get_stacktrace_info
32// from assemblyline.filestore.transport.azure import TransportAzure
33// from assemblyline.filestore.transport.base import TransportException
34// from assemblyline.filestore.transport.ftp import TransportFTP
35// from assemblyline.filestore.transport.http import TransportHTTP
36// from assemblyline.filestore.transport.local import TransportLocal
37// from assemblyline.filestore.transport.s3 import TransportS3
38// from assemblyline.filestore.transport.sftp import TransportSFTP
39
40// if TYPE_CHECKING:
41//     from assemblyline.filestore.transport.base import Transport
42
43
44// class FileStoreException(Exception):
45//     pass
46
47
48// class CorruptedFileStoreException(Exception):
49//     pass
50
51
52// def _get_extras(parsed_dict, valid_str_keys=None, valid_bool_keys=None):
53//     if not valid_str_keys:
54//         valid_str_keys = []
55//     if not valid_bool_keys:
56//         valid_bool_keys = []
57
58//     out = {}
59//     for k, v in parsed_dict.items():
60//         if k in valid_bool_keys:
61//             if v[0].lower() == 'true':
62//                 out[k] = True
63//             elif v[0].lower() != 'true':
64//                 out[k] = False
65//         if k in valid_str_keys:
66//             out[k] = v[0]
67
68//     return out
69
70
71// def create_transport(url, connection_attempts=None):
72//     """
73//     Transports are being initiated using a URL. They follow the normal URL format:
74//     scheme://user:pass@host:port/path/to/file
75
76//     In this example, it will extract the following parameters:
77//     scheme: scheme
78//     host: host
79//     user: user
80//     password: pass
81//     port: port
82//     base: /path/to/file
83
84//     Certain transports can have extra parameters, those parameters need to be specified in the query part of the URL.
85//     e.g.: sftp://host.com/path/to/file?private_key=/etc/ssl/pkey&private_key_pass=pass&validate_host=true
86//     scheme: sftp
87//     host: host.com
88//     user:
89//     password:
90//     private_key: /etc/ssl/pkey
91//     private_key_pass: pass
92//     validate_host: True
93
94//     NOTE: For transports with extra parameters, only specific extra parameters are allowed. This is the list of extra
95//     parameters allowed:
96
97//         ftp: use_tls (bool)
98//         http: pki (string)
99//         sftp: private_key (string), private_key_pass (string), validate_host (bool)
100//         s3: aws_region (string), s3_bucket (string), use_ssl (bool), verify (bool)
101//         file: normalize (bool)
102//         azure: access_key (string), tenant_id (string), client_id (string), client_secret (string), allow_directory_access (bool), use_default_credentials (bool)
103
104//     """
105
106//     parsed = urlparse(url)
107
108//     base = parsed.path or '/'
109//     host = parsed.hostname
110//     if host == ".":
111//         base = "%s%s" % (host, base)
112//     port = parsed.port
113//     if parsed.password:
114//         password = unquote(parsed.password)
115//     else:
116//         password = ''
117//     user = parsed.username or ''
118
119//     scheme = parsed.scheme.lower()
120//     if scheme == 'ftp' or scheme == 'ftps':
121//         valid_bool_keys = ['use_tls']
122//         extras = _get_extras(parse_qs(parsed.query), valid_bool_keys=valid_bool_keys)
123//         if scheme == 'ftps':
124//             extras['use_tls'] = True
125
126//         t = TransportFTP(base=base, host=host, password=password, user=user, port=port, **extras)
127
128//     elif scheme == "sftp":
129//         valid_str_keys = ['private_key', 'private_key_pass']
130//         valid_bool_keys = ['validate_host']
131//         extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
132
133//         t = TransportSFTP(base=base, host=host, password=password, user=user, port=port, **extras)
134
135//     elif scheme == 'http' or scheme == 'https':
136//         valid_str_keys = ['pki']
137//         valid_bool_keys = ['verify']
138//         extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
139
140//         t = TransportHTTP(scheme=scheme, base=base, host=host, password=password, user=user, port=port, **extras)
141
142
143//     elif scheme == 's3':
144//         valid_str_keys = ['aws_region', 's3_bucket']
145//         valid_bool_keys = ['use_ssl', 'verify', 'boto_defaults']
146//         extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
147
148//         # If user/password not specified, access might be dictated by IAM roles
149//         if not user and not password:
150//             user, password = None, None
151
152//         t = TransportS3(base=base, host=host, port=port, accesskey=user, secretkey=password,
153//                         connection_attempts=connection_attempts, **extras)
154
155
156//     else:
157//         raise FileStoreException("Unknown transport: %s" % scheme)
158
159//     return t
160
161
162// class FileStore(object):
163//     def __init__(self, *transport_urls, connection_attempts=None):
164//         self.log = logging.getLogger('assemblyline.transport')
165//         self.transport_urls = transport_urls
166//         self.transports = [create_transport(url, connection_attempts) for url in transport_urls]
167//         self.local_transports = [
168//             t for t in self.transports if isinstance(t, TransportLocal)
169//         ]
170
171//     def __eq__(self, obj: FileStore) -> bool:
172//         return self.transport_urls == obj.transport_urls
173
174//     def __enter__(self):
175//         return self
176
177//     def __exit__(self, ex_type, exc_val, exc_tb):
178//         self.close()
179
180//     def __str__(self):
181//         return ', '.join(str(t) for t in self.transports)
182
183//     def close(self):
184//         for t in self.transports:
185//             try:
186//                 t.close()
187//             except Exception as ex:
188//                 trace = get_stacktrace_info(ex)
189//                 self.log.warning('Transport problem: %s', trace)
190
191//     @elasticapm.capture_span(span_type='filestore')
192//     def delete(self, path: str, location='all'):
193//         with elasticapm.capture_span(name='delete', span_type='filestore', labels={'path': path}):
194//             for t in self.slice(location):
195//                 try:
196//                     t.delete(path)
197//                 except Exception as ex:
198//                     trace = get_stacktrace_info(ex)
199//                     self.log.info('Transport problem: %s', trace)
200
201//     @elasticapm.capture_span(span_type='filestore')
202//     def download(self, src_path: str, dest_path: str, location='any'):
203//         successful = False
204//         transports = []
205//         download_errors = []
206//         for t in self.slice(location):
207//             try:
208//                 t.download(src_path, dest_path)
209//                 transports.append(t)
210//                 successful = True
211//                 break
212//             except Exception as ex:
213//                 download_errors.append((str(t), str(ex)))
214
215//         if not successful:
216//             raise FileStoreException('No transport succeeded => %s' % json.dumps(download_errors))
217//         return transports
218
219//     @elasticapm.capture_span(span_type='filestore')
220//     def exists(self, path, location='any') -> list[Transport]:
221//         transports = []
222//         for t in self.slice(location):
223//             try:
224//                 if t.exists(path):
225//                     transports.append(t)
226//                     if location == 'any':
227//                         break
228//             except Exception as ex:
229//                 trace = get_stacktrace_info(ex)
230//                 self.log.warning('Transport problem: %s', trace)
231//         return transports
232
233//     @elasticapm.capture_span(span_type='filestore')
234//     def get(self, path: str, location='any') -> Optional[bytes]:
235//         for t in self.slice(location):
236//             try:
237//                 return t.get(path)
238//             except TransportException as ex:
239//                 if isinstance(ex.cause, FileNotFoundError):
240//                     pass
241//                 else:
242//                     trace = get_stacktrace_info(ex)
243//                     self.log.warning('Transport problem: %s', trace)
244//             except Exception as ex:
245//                 trace = get_stacktrace_info(ex)
246//                 self.log.warning('Transport problem: %s', trace)
247//         return None
248
249//     @elasticapm.capture_span(span_type='filestore')
250//     def put(self, dst_path: str, content: AnyStr, location='all', force=False) -> list[Transport]:
251//         transports = []
252//         for t in self.slice(location):
253//             if force or not t.exists(dst_path):
254//                 transports.append(t)
255//                 t.put(dst_path, content)
256//                 if not t.exists(dst_path):
257//                     raise FileStoreException('File transfer failed. Remote file does not '
258//                                              'exist for %s on %s (%s)' % (dst_path, location, t))
259//         return transports
260
261//     def slice(self, location):
262//         start, end = {
263//             'all': (0, len(self.transports)),
264//             'any': (0, len(self.transports)),
265//             'far': (-1, len(self.transports)),
266//             'near': (0, 1),
267//         }[location]
268
269//         transports = self.transports[start:end]
270//         assert (len(transports) >= 1)
271//         return transports
272
273//     @elasticapm.capture_span(span_type='filestore')
274//     def upload(self, src_path: str, dst_path: str, location='all', force=False, verify=False) -> list[Transport]:
275//         transports = []
276//         for t in self.slice(location):
277//             if force or not t.exists(dst_path):
278//                 transports.append(t)
279//                 t.upload(src_path, dst_path)
280//                 if verify and not t.exists(dst_path):
281//                     raise FileStoreException('File transfer failed. Remote file does not '
282//                                              'exist for %s on %s (%s)' % (dst_path, location, t))
283//         return transports
284
285//     @elasticapm.capture_span(span_type='filestore')
286//     def upload_batch(self, local_remote_tuples, location='all') -> list[Tuple[str, str, str]]:
287//         failed_tuples = []
288//         for (src_path, dst_path) in local_remote_tuples:
289//             try:
290//                 self.upload(src_path, dst_path, location)
291//             except Exception as ex:
292//                 trace = get_stacktrace_info(ex)
293//                 failed_tuples.append((src_path, dst_path, trace))
294//         return failed_tuples