import hashlib
import httplib
import json
import logging
import optparse
import os
import shutil
import sys
import tarfile
import tempfile
import threading
import time
import urllib2
import urlparse
import zipfile
from subprocess import PIPE
from subprocess import Popen
__version__ = '1'
DEFAULT_MANIFEST_NAME = 'manifest.tt'
TOOLTOOL_PACKAGE_SUFFIX = '.TOOLTOOL-PACKAGE'
log = logging.getLogger(__name__)
class FileRecordJSONEncoderException(Exception):
pass
class InvalidManifest(Exception):
pass
class ExceptionWithFilename(Exception):
def __init__(self, filename):
Exception.__init__(self)
self.filename = filename
class BadFilenameException(ExceptionWithFilename):
pass
class DigestMismatchException(ExceptionWithFilename):
pass
class MissingFileException(ExceptionWithFilename):
pass
class FileRecord(object):
def __init__(self, filename, size, digest, algorithm, unpack=False,
visibility=None, setup=None):
object.__init__(self)
if '/' in filename or '\\' in filename:
log.error(
"The filename provided contains path information and is, therefore, invalid.")
raise BadFilenameException(filename=filename)
self.filename = filename
self.size = size
self.digest = digest
self.algorithm = algorithm
self.unpack = unpack
self.visibility = visibility
self.setup = setup
def __eq__(self, other):
if self is other:
return True
if self.filename == other.filename and \
self.size == other.size and \
self.digest == other.digest and \
self.algorithm == other.algorithm and \
self.visibility == other.visibility:
return True
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return repr(self)
def __repr__(self):
return "%s.%s(filename='%s', size=%s, digest='%s', algorithm='%s', visibility=%r)" % (
__name__, self.__class__.__name__, self.filename, self.size,
self.digest, self.algorithm, self.visibility)
def present(self):
return os.path.exists(self.filename)
def validate_size(self):
if self.present():
return self.size == os.path.getsize(self.filename)
else:
log.debug(
"trying to validate size on a missing file, %s", self.filename)
raise MissingFileException(filename=self.filename)
def validate_digest(self):
if self.present():
with open(self.filename, 'rb') as f:
return self.digest == digest_file(f, self.algorithm)
else:
log.debug(
"trying to validate digest on a missing file, %s', self.filename")
raise MissingFileException(filename=self.filename)
def validate(self):
if self.validate_size():
if self.validate_digest():
return True
return False
def describe(self):
if self.present() and self.validate():
return "'%s' is present and valid" % self.filename
elif self.present():
return "'%s' is present and invalid" % self.filename
else:
return "'%s' is absent" % self.filename
def create_file_record(filename, algorithm):
fo = open(filename, 'rb')
stored_filename = os.path.split(filename)[1]
fr = FileRecord(stored_filename, os.path.getsize(
filename), digest_file(fo, algorithm), algorithm)
fo.close()
return fr
class FileRecordJSONEncoder(json.JSONEncoder):
def encode_file_record(self, obj):
if not issubclass(type(obj), FileRecord):
err = "FileRecordJSONEncoder is only for FileRecord and lists of FileRecords, " \
"not %s" % obj.__class__.__name__
log.warn(err)
raise FileRecordJSONEncoderException(err)
else:
rv = {
'filename': obj.filename,
'size': obj.size,
'algorithm': obj.algorithm,
'digest': obj.digest,
}
if obj.unpack:
rv['unpack'] = True
if obj.visibility is not None:
rv['visibility'] = obj.visibility
if obj.setup:
rv['setup'] = obj.setup
return rv
def default(self, f):
if issubclass(type(f), list):
record_list = []
for i in f:
record_list.append(self.encode_file_record(i))
return record_list
else:
return self.encode_file_record(f)
class FileRecordJSONDecoder(json.JSONDecoder):
def process_file_records(self, obj):
if isinstance(obj, list):
record_list = []
for i in obj:
record = self.process_file_records(i)
if issubclass(type(record), FileRecord):
record_list.append(record)
return record_list
required_fields = [
'filename',
'size',
'algorithm',
'digest',
]
if isinstance(obj, dict):
missing = False
for req in required_fields:
if req not in obj:
missing = True
break
if not missing:
unpack = obj.get('unpack', False)
visibility = obj.get('visibility', None)
setup = obj.get('setup')
rv = FileRecord(
obj['filename'], obj['size'], obj['digest'], obj['algorithm'],
unpack, visibility, setup)
log.debug("materialized %s" % rv)
return rv
return obj
def decode(self, s):
decoded = json.JSONDecoder.decode(self, s)
rv = self.process_file_records(decoded)
return rv
class Manifest(object):
valid_formats = ('json',)
def __init__(self, file_records=None):
self.file_records = file_records or []
def __eq__(self, other):
if self is other:
return True
if len(self.file_records) != len(other.file_records):
log.debug('Manifests differ in number of files')
return False
mine = sorted((fr.filename, fr) for fr in self.file_records)
theirs = sorted((fr.filename, fr) for fr in other.file_records)
return mine == theirs
def __ne__(self, other):
return not self.__eq__(other)
def __deepcopy__(self, memo):
return Manifest(self.file_records[:])
def __copy__(self):
return Manifest(self.file_records)
def copy(self):
return Manifest(self.file_records[:])
def present(self):
return all(i.present() for i in self.file_records)
def validate_sizes(self):
return all(i.validate_size() for i in self.file_records)
def validate_digests(self):
return all(i.validate_digest() for i in self.file_records)
def validate(self):
return all(i.validate() for i in self.file_records)
def load(self, data_file, fmt='json'):
assert fmt in self.valid_formats
if fmt == 'json':
try:
self.file_records.extend(
json.load(data_file, cls=FileRecordJSONDecoder))
except ValueError:
raise InvalidManifest("trying to read invalid manifest file")
def loads(self, data_string, fmt='json'):
assert fmt in self.valid_formats
if fmt == 'json':
try:
self.file_records.extend(
json.loads(data_string, cls=FileRecordJSONDecoder))
except ValueError:
raise InvalidManifest("trying to read invalid manifest file")
def dump(self, output_file, fmt='json'):
assert fmt in self.valid_formats
if fmt == 'json':
rv = json.dump(
self.file_records, output_file, indent=0, cls=FileRecordJSONEncoder,
separators=(',', ': '))
print >> output_file, ''
return rv
def dumps(self, fmt='json'):
assert fmt in self.valid_formats
if fmt == 'json':
return json.dumps(self.file_records, cls=FileRecordJSONEncoder)
def digest_file(f, a):
h = hashlib.new(a)
chunk_size = 1024 * 10
data = f.read(chunk_size)
while data:
h.update(data)
data = f.read(chunk_size)
name = repr(f.name) if hasattr(f, 'name') else 'a file'
log.debug('hashed %s with %s to be %s', name, a, h.hexdigest())
return h.hexdigest()
def execute(cmd):
process = Popen(cmd, shell=True, stdout=PIPE)
while True:
line = process.stdout.readline()
if not line:
break
log.info(line.replace('\n', ' '))
return process.wait() == 0
def open_manifest(manifest_file):
if os.path.exists(manifest_file):
manifest = Manifest()
with open(manifest_file) as f:
manifest.load(f)
log.debug("loaded manifest from file '%s'" % manifest_file)
return manifest
else:
log.debug("tried to load absent file '%s' as manifest" % manifest_file)
raise InvalidManifest(
"manifest file '%s' does not exist" % manifest_file)
def list_manifest(manifest_file):
try:
manifest = open_manifest(manifest_file)
except InvalidManifest as e:
log.error("failed to load manifest file at '%s': %s" % (
manifest_file,
str(e),
))
return False
for f in manifest.file_records:
print "%s\t%s\t%s" % ("P" if f.present() else "-",
"V" if f.present() and f.validate() else "-",
f.filename)
return True
def validate_manifest(manifest_file):
try:
manifest = open_manifest(manifest_file)
except InvalidManifest as e:
log.error("failed to load manifest file at '%s': %s" % (
manifest_file,
str(e),
))
return False
invalid_files = []
absent_files = []
for f in manifest.file_records:
if not f.present():
absent_files.append(f)
else:
if not f.validate():
invalid_files.append(f)
if len(invalid_files + absent_files) == 0:
return True
else:
return False
def add_files(manifest_file, algorithm, filenames, visibility, unpack):
all_files_added = True
if os.path.exists(manifest_file):
old_manifest = open_manifest(manifest_file)
else:
old_manifest = Manifest()
log.debug("creating a new manifest file")
new_manifest = Manifest() for filename in filenames:
log.debug("adding %s" % filename)
path, name = os.path.split(filename)
new_fr = create_file_record(filename, algorithm)
new_fr.visibility = visibility
new_fr.unpack = unpack
log.debug("appending a new file record to manifest file")
add = True
for fr in old_manifest.file_records:
log.debug("manifest file has '%s'" % "', ".join(
[x.filename for x in old_manifest.file_records]))
if new_fr == fr:
log.info("file already in old_manifest")
add = False
elif filename == fr.filename:
log.error("manifest already contains a different file named %s" % filename)
add = False
if add:
new_manifest.file_records.append(new_fr)
log.debug("added '%s' to manifest" % filename)
else:
all_files_added = False
new_filenames = set(fr.filename for fr in new_manifest.file_records)
for old_fr in old_manifest.file_records:
if old_fr.filename not in new_filenames:
new_manifest.file_records.append(old_fr)
with open(manifest_file, 'wb') as output:
new_manifest.dump(output, fmt='json')
return all_files_added
def touch(f):
try:
os.utime(f, None)
except OSError:
log.warn('impossible to update utime of file %s' % f)
def fetch_file(base_urls, file_record, grabchunk=1024 * 4, auth_file=None, region=None):
fd, temp_path = tempfile.mkstemp(dir=os.getcwd())
os.close(fd)
fetched_path = None
for base_url in base_urls:
url = urlparse.urljoin(base_url,
'%s/%s' % (file_record.algorithm, file_record.digest))
if region is not None:
url += '?region=' + region
log.info("Attempting to fetch from '%s'..." % base_url)
try:
req = urllib2.Request(url)
_authorize(req, auth_file)
f = urllib2.urlopen(req)
log.debug("opened %s for reading" % url)
with open(temp_path, 'wb') as out:
k = True
size = 0
while k:
indata = f.read(grabchunk)
out.write(indata)
size += len(indata)
if indata == '':
k = False
log.info("File %s fetched from %s as %s" %
(file_record.filename, base_url, temp_path))
fetched_path = temp_path
break
except (urllib2.URLError, urllib2.HTTPError, ValueError) as e:
log.info("...failed to fetch '%s' from %s" %
(file_record.filename, base_url))
log.debug("%s" % e)
except IOError: log.info("failed to write to temporary file for '%s'" %
file_record.filename, exc_info=True)
if fetched_path:
return os.path.split(fetched_path)[1]
else:
try:
os.remove(temp_path)
except OSError: pass
return None
def clean_path(dirname):
if os.path.exists(dirname):
log.info('rm tree: %s' % dirname)
shutil.rmtree(dirname)
def unpack_file(filename, setup=None):
if tarfile.is_tarfile(filename):
tar_file, zip_ext = os.path.splitext(filename)
base_file, tar_ext = os.path.splitext(tar_file)
clean_path(base_file)
log.info('untarring "%s"' % filename)
tar = tarfile.open(filename)
tar.extractall()
tar.close()
elif filename.endswith('.tar.xz'):
base_file = filename.replace('.tar.xz', '')
clean_path(base_file)
log.info('untarring "%s"' % filename)
if not execute('tar -Jxf %s 2>&1' % filename):
return False
elif zipfile.is_zipfile(filename):
base_file = filename.replace('.zip', '')
clean_path(base_file)
log.info('unzipping "%s"' % filename)
z = zipfile.ZipFile(filename)
z.extractall()
z.close()
else:
log.error("Unknown archive extension for filename '%s'" % filename)
return False
if setup and not execute(os.path.join(base_file, setup)):
return False
return True
def fetch_files(manifest_file, base_urls, filenames=[], cache_folder=None,
auth_file=None, region=None):
try:
manifest = open_manifest(manifest_file)
except InvalidManifest as e:
log.error("failed to load manifest file at '%s': %s" % (
manifest_file,
str(e),
))
return False
present_files = []
failed_files = []
fetched_files = []
unpack_files = []
setup_files = {}
for f in manifest.file_records:
if f.present():
if f.validate():
present_files.append(f.filename)
if f.unpack:
unpack_files.append(f.filename)
else:
log.info("File %s is present locally but it is invalid, so I will remove it "
"and try to fetch it" % f.filename)
os.remove(os.path.join(os.getcwd(), f.filename))
if cache_folder and f.filename not in present_files:
try:
shutil.copy(os.path.join(cache_folder, f.digest),
os.path.join(os.getcwd(), f.filename))
log.info("File %s retrieved from local cache %s" %
(f.filename, cache_folder))
touch(os.path.join(cache_folder, f.digest))
filerecord_for_validation = FileRecord(
f.filename, f.size, f.digest, f.algorithm)
if filerecord_for_validation.validate():
present_files.append(f.filename)
if f.unpack:
unpack_files.append(f.filename)
else:
log.warn("File %s retrieved from cache is invalid! I am deleting it from the "
"cache as well" % f.filename)
os.remove(os.path.join(os.getcwd(), f.filename))
os.remove(os.path.join(cache_folder, f.digest))
except IOError:
log.info("File %s not present in local cache folder %s" %
(f.filename, cache_folder))
temp_file_name = None
if (f.filename in filenames or len(filenames) == 0) and f.filename not in present_files:
log.debug("fetching %s" % f.filename)
temp_file_name = fetch_file(base_urls, f, auth_file=auth_file, region=region)
if temp_file_name:
fetched_files.append((f, temp_file_name))
else:
failed_files.append(f.filename)
else:
log.debug("skipping %s" % f.filename)
if f.setup:
if f.unpack:
setup_files[f.filename] = f.setup
else:
log.error("'setup' requires 'unpack' being set for %s" % f.filename)
failed_files.append(f.filename)
for localfile, temp_file_name in fetched_files:
filerecord_for_validation = FileRecord(
temp_file_name, localfile.size, localfile.digest, localfile.algorithm)
if filerecord_for_validation.validate():
log.info("File integrity verified, renaming %s to %s" %
(temp_file_name, localfile.filename))
os.rename(os.path.join(os.getcwd(), temp_file_name),
os.path.join(os.getcwd(), localfile.filename))
if localfile.unpack:
unpack_files.append(localfile.filename)
if cache_folder:
log.info("Updating local cache %s..." % cache_folder)
try:
if not os.path.exists(cache_folder):
log.info("Creating cache in %s..." % cache_folder)
os.makedirs(cache_folder, 0700)
shutil.copy(os.path.join(os.getcwd(), localfile.filename),
os.path.join(cache_folder, localfile.digest))
log.info("Local cache %s updated with %s" % (cache_folder,
localfile.filename))
touch(os.path.join(cache_folder, localfile.digest))
except (OSError, IOError):
log.warning('Impossible to add file %s to cache folder %s' %
(localfile.filename, cache_folder), exc_info=True)
else:
failed_files.append(localfile.filename)
log.error("'%s'" % filerecord_for_validation.describe())
os.remove(temp_file_name)
for filename in unpack_files:
if not unpack_file(filename, setup_files.get(filename)):
failed_files.append(filename)
if len(failed_files) > 0:
log.error("The following files failed: '%s'" %
"', ".join(failed_files))
return False
return True
def freespace(p):
"Returns the number of bytes free under directory `p`"
if sys.platform == 'win32': import win32file
secsPerClus, bytesPerSec, nFreeClus, totClus = win32file.GetDiskFreeSpace(
p)
return secsPerClus * bytesPerSec * nFreeClus
else:
r = os.statvfs(p)
return r.f_frsize * r.f_bavail
def purge(folder, gigs):
full_purge = bool(gigs == 0)
gigs *= 1024 * 1024 * 1024
if not full_purge and freespace(folder) >= gigs:
log.info("No need to cleanup")
return
files = []
for f in os.listdir(folder):
p = os.path.join(folder, f)
if not os.path.isfile(p):
continue
mtime = os.path.getmtime(p)
files.append((mtime, p))
for _, f in sorted(files):
log.info("removing %s to free up space" % f)
try:
os.remove(f)
except OSError:
log.info("Impossible to remove %s" % f, exc_info=True)
if not full_purge and freespace(folder) >= gigs:
break
def _log_api_error(e):
if hasattr(e, 'hdrs') and e.hdrs['content-type'] == 'application/json':
json_resp = json.load(e.fp)
log.error("%s: %s" % (json_resp['error']['name'],
json_resp['error']['description']))
else:
log.exception("Error making RelengAPI request:")
def _authorize(req, auth_file):
if auth_file:
log.debug("using bearer token in %s" % auth_file)
req.add_unredirected_header('Authorization',
'Bearer %s' % (open(auth_file).read().strip()))
def _send_batch(base_url, auth_file, batch, region):
url = urlparse.urljoin(base_url, 'upload')
if region is not None:
url += "?region=" + region
req = urllib2.Request(url, json.dumps(batch), {'Content-Type': 'application/json'})
_authorize(req, auth_file)
try:
resp = urllib2.urlopen(req)
except (urllib2.URLError, urllib2.HTTPError) as e:
_log_api_error(e)
return None
return json.load(resp)['result']
def _s3_upload(filename, file):
url = urlparse.urlparse(file['put_url'])
cls = httplib.HTTPSConnection if url.scheme == 'https' else httplib.HTTPConnection
host, port = url.netloc.split(':') if ':' in url.netloc else (url.netloc, 443)
port = int(port)
conn = cls(host, port)
try:
req_path = "%s?%s" % (url.path, url.query) if url.query else url.path
conn.request('PUT', req_path, open(filename),
{'Content-type': 'application/octet-stream'})
resp = conn.getresponse()
resp_body = resp.read()
conn.close()
if resp.status != 200:
raise RuntimeError("Non-200 return from AWS: %s %s\n%s" %
(resp.status, resp.reason, resp_body))
except Exception:
file['upload_exception'] = sys.exc_info()
file['upload_ok'] = False
else:
file['upload_ok'] = True
def _notify_upload_complete(base_url, auth_file, file):
req = urllib2.Request(
urlparse.urljoin(
base_url,
'upload/complete/%(algorithm)s/%(digest)s' % file))
_authorize(req, auth_file)
try:
urllib2.urlopen(req)
except urllib2.HTTPError as e:
if e.code != 409:
_log_api_error(e)
return
to_wait = int(e.headers.get('X-Retry-After', 60))
log.warning("Waiting %d seconds for upload URLs to expire" % to_wait)
time.sleep(to_wait)
_notify_upload_complete(base_url, auth_file, file)
except Exception:
log.exception("While notifying server of upload completion:")
def upload(manifest, message, base_urls, auth_file, region):
try:
manifest = open_manifest(manifest)
except InvalidManifest:
log.exception("failed to load manifest file at '%s'")
return False
if not manifest.validate():
log.error('manifest is invalid')
return False
if any(fr.visibility is None for fr in manifest.file_records):
log.error('All files in a manifest for upload must have a visibility set')
batch = {
'message': message,
'files': {},
}
for fr in manifest.file_records:
batch['files'][fr.filename] = {
'size': fr.size,
'digest': fr.digest,
'algorithm': fr.algorithm,
'visibility': fr.visibility,
}
resp = _send_batch(base_urls[0], auth_file, batch, region)
if not resp:
return None
files = resp['files']
threads = {}
for filename, file in files.iteritems():
if 'put_url' in file:
log.info("%s: starting upload" % (filename,))
thd = threading.Thread(target=_s3_upload,
args=(filename, file))
thd.daemon = 1
thd.start()
threads[filename] = thd
else:
log.info("%s: already exists on server" % (filename,))
success = True
while threads:
for filename, thread in threads.items():
if not thread.is_alive():
file = files[filename]
thread.join()
if file['upload_ok']:
log.info("%s: uploaded" % filename)
else:
log.error("%s: failed" % filename,
exc_info=file['upload_exception'])
success = False
del threads[filename]
for filename, file in files.iteritems():
if 'put_url' in file and file['upload_ok']:
log.info("notifying server of upload completion for %s" % (filename,))
_notify_upload_complete(base_urls[0], auth_file, file)
return success
def process_command(options, args):
cmd = args[0]
cmd_args = args[1:]
log.debug("processing '%s' command with args '%s'" %
(cmd, '", "'.join(cmd_args)))
log.debug("using options: %s" % options)
if cmd == 'list':
return list_manifest(options['manifest'])
if cmd == 'validate':
return validate_manifest(options['manifest'])
elif cmd == 'add':
return add_files(options['manifest'], options['algorithm'], cmd_args,
options['visibility'], options['unpack'])
elif cmd == 'purge':
if options['cache_folder']:
purge(folder=options['cache_folder'], gigs=options['size'])
else:
log.critical('please specify the cache folder to be purged')
return False
elif cmd == 'fetch':
return fetch_files(
options['manifest'],
options['base_url'],
cmd_args,
cache_folder=options['cache_folder'],
auth_file=options.get("auth_file"),
region=options.get('region'))
elif cmd == 'upload':
if not options.get('message'):
log.critical('upload command requires a message')
return False
return upload(
options.get('manifest'),
options.get('message'),
options.get('base_url'),
options.get('auth_file'),
options.get('region'))
else:
log.critical('command "%s" is not implemented' % cmd)
return False
def main(argv, _skip_logging=False):
parser = optparse.OptionParser()
parser.add_option('-q', '--quiet', default=logging.INFO,
dest='loglevel', action='store_const', const=logging.ERROR)
parser.add_option('-v', '--verbose',
dest='loglevel', action='store_const', const=logging.DEBUG)
parser.add_option('-m', '--manifest', default=DEFAULT_MANIFEST_NAME,
dest='manifest', action='store',
help='specify the manifest file to be operated on')
parser.add_option('-d', '--algorithm', default='sha512',
dest='algorithm', action='store',
help='hashing algorithm to use (only sha512 is allowed)')
parser.add_option('--visibility', default=None,
dest='visibility', choices=['internal', 'public'],
help='Visibility level of this file; "internal" is for '
'files that cannot be distributed out of the company '
'but not for secrets; "public" files are available to '
'anyone withou trestriction')
parser.add_option('--unpack', default=False,
dest='unpack', action='store_true',
help='Request unpacking this file after fetch.'
' This is helpful with tarballs.')
parser.add_option('-o', '--overwrite', default=False,
dest='overwrite', action='store_true',
help='UNUSED; present for backward compatibility')
parser.add_option('--url', dest='base_url', action='append',
help='RelengAPI URL ending with /tooltool/; default '
'is appropriate for Mozilla')
parser.add_option('-c', '--cache-folder', dest='cache_folder',
help='Local cache folder')
parser.add_option('-s', '--size',
help='free space required (in GB)', dest='size',
type='float', default=0.)
parser.add_option('-r', '--region', help='Preferred AWS region for upload or fetch; '
'example: --region=us-west-2')
parser.add_option('--message',
help='The "commit message" for an upload; format with a bug number '
'and brief comment',
dest='message')
parser.add_option('--authentication-file',
help='Use the RelengAPI token found in the given file to '
'authenticate to the RelengAPI server.',
dest='auth_file')
(options_obj, args) = parser.parse_args(argv[1:])
if not options_obj.base_url:
options_obj.base_url = ['https://api.pub.build.mozilla.org/tooltool/']
def add_slash(url):
return url if url.endswith('/') else (url + '/')
options_obj.base_url = [add_slash(u) for u in options_obj.base_url]
if options_obj.auth_file:
options_obj.auth_file = os.path.expanduser(options_obj.auth_file)
options = vars(options_obj)
log.setLevel(options['loglevel'])
if not _skip_logging: ch = logging.StreamHandler()
cf = logging.Formatter("%(levelname)s - %(message)s")
ch.setFormatter(cf)
log.addHandler(ch)
if options['algorithm'] != 'sha512':
parser.error('only --algorithm sha512 is supported')
if len(args) < 1:
parser.error('You must specify a command')
return 0 if process_command(options, args) else 1
if __name__ == "__main__": sys.exit(main(sys.argv))