assemblyline_filestore/lib.rs
1//! A library that provides adaptors to blob storage for the assemblyline platform.
2#![warn(missing_docs, non_ascii_idents, trivial_numeric_casts,
3 unused_crate_dependencies, noop_method_call, single_use_lifetimes, trivial_casts,
4 unused_lifetimes, nonstandard_style, variant_size_differences)]
5#![deny(keyword_idents)]
6// #![warn(clippy::missing_docs_in_private_items)]
7#![allow(
8 // allow these as they sometimes improve clarity
9 clippy::needless_return,
10 clippy::collapsible_else_if
11)]
12
13mod filestore;
14mod transport;
15pub mod errors;
16
17pub use filestore::FileStore;
18
19#[cfg(test)]
20mod tests;
21
22
23// from __future__ import annotations
24
25// import json
26// import logging
27// from typing import TYPE_CHECKING, AnyStr, Optional, Tuple
28// from urllib.parse import parse_qs, unquote, urlparse
29
30// import elasticapm
31// from assemblyline.common.exceptions import get_stacktrace_info
32// from assemblyline.filestore.transport.azure import TransportAzure
33// from assemblyline.filestore.transport.base import TransportException
34// from assemblyline.filestore.transport.ftp import TransportFTP
35// from assemblyline.filestore.transport.http import TransportHTTP
36// from assemblyline.filestore.transport.local import TransportLocal
37// from assemblyline.filestore.transport.s3 import TransportS3
38// from assemblyline.filestore.transport.sftp import TransportSFTP
39
40// if TYPE_CHECKING:
41// from assemblyline.filestore.transport.base import Transport
42
43
44// class FileStoreException(Exception):
45// pass
46
47
48// class CorruptedFileStoreException(Exception):
49// pass
50
51
52// def _get_extras(parsed_dict, valid_str_keys=None, valid_bool_keys=None):
53// if not valid_str_keys:
54// valid_str_keys = []
55// if not valid_bool_keys:
56// valid_bool_keys = []
57
58// out = {}
59// for k, v in parsed_dict.items():
60// if k in valid_bool_keys:
61// if v[0].lower() == 'true':
62// out[k] = True
63// elif v[0].lower() != 'true':
64// out[k] = False
65// if k in valid_str_keys:
66// out[k] = v[0]
67
68// return out
69
70
71// def create_transport(url, connection_attempts=None):
72// """
73// Transports are being initiated using a URL. They follow the normal URL format:
74// scheme://user:pass@host:port/path/to/file
75
76// In this example, it will extract the following parameters:
77// scheme: scheme
78// host: host
79// user: user
80// password: pass
81// port: port
82// base: /path/to/file
83
84// Certain transports can have extra parameters, those parameters need to be specified in the query part of the URL.
85// e.g.: sftp://host.com/path/to/file?private_key=/etc/ssl/pkey&private_key_pass=pass&validate_host=true
86// scheme: sftp
87// host: host.com
88// user:
89// password:
90// private_key: /etc/ssl/pkey
91// private_key_pass: pass
92// validate_host: True
93
94// NOTE: For transports with extra parameters, only specific extra parameters are allowed. This is the list of extra
95// parameters allowed:
96
97// ftp: use_tls (bool)
98// http: pki (string)
99// sftp: private_key (string), private_key_pass (string), validate_host (bool)
100// s3: aws_region (string), s3_bucket (string), use_ssl (bool), verify (bool)
101// file: normalize (bool)
102// azure: access_key (string), tenant_id (string), client_id (string), client_secret (string), allow_directory_access (bool), use_default_credentials (bool)
103
104// """
105
106// parsed = urlparse(url)
107
108// base = parsed.path or '/'
109// host = parsed.hostname
110// if host == ".":
111// base = "%s%s" % (host, base)
112// port = parsed.port
113// if parsed.password:
114// password = unquote(parsed.password)
115// else:
116// password = ''
117// user = parsed.username or ''
118
119// scheme = parsed.scheme.lower()
120// if scheme == 'ftp' or scheme == 'ftps':
121// valid_bool_keys = ['use_tls']
122// extras = _get_extras(parse_qs(parsed.query), valid_bool_keys=valid_bool_keys)
123// if scheme == 'ftps':
124// extras['use_tls'] = True
125
126// t = TransportFTP(base=base, host=host, password=password, user=user, port=port, **extras)
127
128// elif scheme == "sftp":
129// valid_str_keys = ['private_key', 'private_key_pass']
130// valid_bool_keys = ['validate_host']
131// extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
132
133// t = TransportSFTP(base=base, host=host, password=password, user=user, port=port, **extras)
134
135// elif scheme == 'http' or scheme == 'https':
136// valid_str_keys = ['pki']
137// valid_bool_keys = ['verify']
138// extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
139
140// t = TransportHTTP(scheme=scheme, base=base, host=host, password=password, user=user, port=port, **extras)
141
142
143// elif scheme == 's3':
144// valid_str_keys = ['aws_region', 's3_bucket']
145// valid_bool_keys = ['use_ssl', 'verify', 'boto_defaults']
146// extras = _get_extras(parse_qs(parsed.query), valid_str_keys=valid_str_keys, valid_bool_keys=valid_bool_keys)
147
148// # If user/password not specified, access might be dictated by IAM roles
149// if not user and not password:
150// user, password = None, None
151
152// t = TransportS3(base=base, host=host, port=port, accesskey=user, secretkey=password,
153// connection_attempts=connection_attempts, **extras)
154
155
156// else:
157// raise FileStoreException("Unknown transport: %s" % scheme)
158
159// return t
160
161
162// class FileStore(object):
163// def __init__(self, *transport_urls, connection_attempts=None):
164// self.log = logging.getLogger('assemblyline.transport')
165// self.transport_urls = transport_urls
166// self.transports = [create_transport(url, connection_attempts) for url in transport_urls]
167// self.local_transports = [
168// t for t in self.transports if isinstance(t, TransportLocal)
169// ]
170
171// def __eq__(self, obj: FileStore) -> bool:
172// return self.transport_urls == obj.transport_urls
173
174// def __enter__(self):
175// return self
176
177// def __exit__(self, ex_type, exc_val, exc_tb):
178// self.close()
179
180// def __str__(self):
181// return ', '.join(str(t) for t in self.transports)
182
183// def close(self):
184// for t in self.transports:
185// try:
186// t.close()
187// except Exception as ex:
188// trace = get_stacktrace_info(ex)
189// self.log.warning('Transport problem: %s', trace)
190
191// @elasticapm.capture_span(span_type='filestore')
192// def delete(self, path: str, location='all'):
193// with elasticapm.capture_span(name='delete', span_type='filestore', labels={'path': path}):
194// for t in self.slice(location):
195// try:
196// t.delete(path)
197// except Exception as ex:
198// trace = get_stacktrace_info(ex)
199// self.log.info('Transport problem: %s', trace)
200
201// @elasticapm.capture_span(span_type='filestore')
202// def download(self, src_path: str, dest_path: str, location='any'):
203// successful = False
204// transports = []
205// download_errors = []
206// for t in self.slice(location):
207// try:
208// t.download(src_path, dest_path)
209// transports.append(t)
210// successful = True
211// break
212// except Exception as ex:
213// download_errors.append((str(t), str(ex)))
214
215// if not successful:
216// raise FileStoreException('No transport succeeded => %s' % json.dumps(download_errors))
217// return transports
218
219// @elasticapm.capture_span(span_type='filestore')
220// def exists(self, path, location='any') -> list[Transport]:
221// transports = []
222// for t in self.slice(location):
223// try:
224// if t.exists(path):
225// transports.append(t)
226// if location == 'any':
227// break
228// except Exception as ex:
229// trace = get_stacktrace_info(ex)
230// self.log.warning('Transport problem: %s', trace)
231// return transports
232
233// @elasticapm.capture_span(span_type='filestore')
234// def get(self, path: str, location='any') -> Optional[bytes]:
235// for t in self.slice(location):
236// try:
237// return t.get(path)
238// except TransportException as ex:
239// if isinstance(ex.cause, FileNotFoundError):
240// pass
241// else:
242// trace = get_stacktrace_info(ex)
243// self.log.warning('Transport problem: %s', trace)
244// except Exception as ex:
245// trace = get_stacktrace_info(ex)
246// self.log.warning('Transport problem: %s', trace)
247// return None
248
249// @elasticapm.capture_span(span_type='filestore')
250// def put(self, dst_path: str, content: AnyStr, location='all', force=False) -> list[Transport]:
251// transports = []
252// for t in self.slice(location):
253// if force or not t.exists(dst_path):
254// transports.append(t)
255// t.put(dst_path, content)
256// if not t.exists(dst_path):
257// raise FileStoreException('File transfer failed. Remote file does not '
258// 'exist for %s on %s (%s)' % (dst_path, location, t))
259// return transports
260
261// def slice(self, location):
262// start, end = {
263// 'all': (0, len(self.transports)),
264// 'any': (0, len(self.transports)),
265// 'far': (-1, len(self.transports)),
266// 'near': (0, 1),
267// }[location]
268
269// transports = self.transports[start:end]
270// assert (len(transports) >= 1)
271// return transports
272
273// @elasticapm.capture_span(span_type='filestore')
274// def upload(self, src_path: str, dst_path: str, location='all', force=False, verify=False) -> list[Transport]:
275// transports = []
276// for t in self.slice(location):
277// if force or not t.exists(dst_path):
278// transports.append(t)
279// t.upload(src_path, dst_path)
280// if verify and not t.exists(dst_path):
281// raise FileStoreException('File transfer failed. Remote file does not '
282// 'exist for %s on %s (%s)' % (dst_path, location, t))
283// return transports
284
285// @elasticapm.capture_span(span_type='filestore')
286// def upload_batch(self, local_remote_tuples, location='all') -> list[Tuple[str, str, str]]:
287// failed_tuples = []
288// for (src_path, dst_path) in local_remote_tuples:
289// try:
290// self.upload(src_path, dst_path, location)
291// except Exception as ex:
292// trace = get_stacktrace_info(ex)
293// failed_tuples.append((src_path, dst_path, trace))
294// return failed_tuples