import unittest
import shutil
import os
import sys
import random
import copy
from string import letters
if not __file__:
__file__ = sys.argv[0]
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from MozZipFile import ZipFile
import zipfile
leafs = (
'firstdir/oneleaf',
'seconddir/twoleaf',
'thirddir/with/sub/threeleaf')
_lengths = map(lambda n: n * 64, [16, 64, 80])
lengths = 3
writes = 5
def givenlength(i):
return _lengths[i]
def prod(*iterables):
for item in iterables[0]:
if len(iterables) == 1:
yield [item]
else:
for others in prod(*iterables[1:]):
yield [item] + others
def getid(descs):
'Convert a list of ints to a string.'
return reduce(lambda x, y: x+'{0}{1}'.format(*tuple(y)), descs, '')
def getContent(length):
'Get pseudo random content of given length.'
rv = [None] * length
for i in xrange(length):
rv[i] = random.choice(letters)
return ''.join(rv)
def createWriter(sizer, *items):
'Helper method to fill in tests, one set of writes, one for each item'
locitems = copy.deepcopy(items)
for item in locitems:
item['length'] = sizer(item.pop('length', 0))
def helper(self):
mode = 'w'
if os.path.isfile(self.f):
mode = 'a'
zf = ZipFile(self.f, mode, self.compression)
for item in locitems:
self._write(zf, **item)
zf = None
pass
return helper
def createTester(name, *writes):
_writes = copy.copy(writes)
def tester(self):
for w in _writes:
getattr(self, w)()
self._verifyZip()
pass
tester.__name__ = name
return tester
class TestExtensiveStored(unittest.TestCase):
stage = "mozzipfilestage"
compression = zipfile.ZIP_STORED
def leaf(self, *leafs):
return os.path.join(self.stage, *leafs)
def setUp(self):
if os.path.exists(self.stage):
shutil.rmtree(self.stage)
os.mkdir(self.stage)
self.f = self.leaf('test.jar')
self.ref = {}
self.seed = 0
def tearDown(self):
self.f = None
self.ref = None
def _verifyZip(self):
zf = zipfile.ZipFile(self.f)
badEntry = zf.testzip()
self.failIf(badEntry, badEntry)
zlist = zf.namelist()
zlist.sort()
vlist = self.ref.keys()
vlist.sort()
self.assertEqual(zlist, vlist)
for leaf, content in self.ref.iteritems():
zcontent = zf.read(leaf)
self.assertEqual(content, zcontent)
def _write(self, zf, seed=None, leaf=0, length=0):
if seed is None:
seed = self.seed
self.seed += 1
random.seed(seed)
leaf = leafs[leaf]
content = getContent(length)
self.ref[leaf] = content
zf.writestr(leaf, content)
dir = os.path.dirname(self.leaf('stage', leaf))
if not os.path.isdir(dir):
os.makedirs(dir)
open(self.leaf('stage', leaf), 'w').write(content)
atomics = list(prod(xrange(len(leafs)), xrange(lengths)))
for w in xrange(writes):
nonatomics = [list(prod(range(min(i, len(leafs))), xrange(lengths)))
for i in xrange(1, w+1)] + [atomics]
for descs in prod(*nonatomics):
suffix = getid(descs)
dicts = [dict(leaf=leaf, length=length) for leaf, length in descs]
setattr(TestExtensiveStored, '_write' + suffix,
createWriter(givenlength, *dicts))
setattr(TestExtensiveStored, 'test' + suffix,
createTester('test' + suffix, '_write' + suffix))
files = [list(prod([i], xrange(lengths))) for i in xrange(len(leafs))]
allfiles = reduce(lambda l, r: l+r,
[list(prod(*files[:(i+1)])) for i in xrange(len(leafs))])
for first in allfiles:
testbasename = 'test{0}_'.format(getid(first))
test = [None, '_write' + getid(first), None]
for second in atomics:
test[0] = testbasename + getid([second])
test[2] = '_write' + getid([second])
setattr(TestExtensiveStored, test[0], createTester(*test))
class TestExtensiveDeflated(TestExtensiveStored):
'Test all that has been tested with ZIP_STORED with DEFLATED, too.'
compression = zipfile.ZIP_DEFLATED
if __name__ == '__main__':
unittest.main()