__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import StringIO
import base64
import copy
import gzip
import httplib2
import logging
import mimeparse
import mimetypes
import os
import random
import sys
import time
import urllib
import urlparse
import uuid
from email.generator import Generator
from email.mime.multipart import MIMEMultipart
from email.mime.nonmultipart import MIMENonMultipart
from email.parser import FeedParser
from errors import BatchError
from errors import HttpError
from errors import InvalidChunkSizeError
from errors import ResumableUploadError
from errors import UnexpectedBodyError
from errors import UnexpectedMethodError
from model import JsonModel
from oauth2client import util
from oauth2client.anyjson import simplejson
DEFAULT_CHUNK_SIZE = 512*1024
MAX_URI_LENGTH = 2048
class MediaUploadProgress(object):
def __init__(self, resumable_progress, total_size):
self.resumable_progress = resumable_progress
self.total_size = total_size
def progress(self):
if self.total_size is not None:
return float(self.resumable_progress) / float(self.total_size)
else:
return 0.0
class MediaDownloadProgress(object):
def __init__(self, resumable_progress, total_size):
self.resumable_progress = resumable_progress
self.total_size = total_size
def progress(self):
if self.total_size is not None:
return float(self.resumable_progress) / float(self.total_size)
else:
return 0.0
class MediaUpload(object):
def chunksize(self):
raise NotImplementedError()
def mimetype(self):
return 'application/octet-stream'
def size(self):
return None
def resumable(self):
return False
def getbytes(self, begin, end):
raise NotImplementedError()
def has_stream(self):
return False
def stream(self):
raise NotImplementedError()
@util.positional(1)
def _to_json(self, strip=None):
t = type(self)
d = copy.copy(self.__dict__)
if strip is not None:
for member in strip:
del d[member]
d['_class'] = t.__name__
d['_module'] = t.__module__
return simplejson.dumps(d)
def to_json(self):
return self._to_json()
@classmethod
def new_from_json(cls, s):
data = simplejson.loads(s)
module = data['_module']
m = __import__(module, fromlist=module.split('.')[:-1])
kls = getattr(m, data['_class'])
from_json = getattr(kls, 'from_json')
return from_json(s)
class MediaIoBaseUpload(MediaUpload):
@util.positional(3)
def __init__(self, fd, mimetype, chunksize=DEFAULT_CHUNK_SIZE,
resumable=False):
super(MediaIoBaseUpload, self).__init__()
self._fd = fd
self._mimetype = mimetype
if not (chunksize == -1 or chunksize > 0):
raise InvalidChunkSizeError()
self._chunksize = chunksize
self._resumable = resumable
self._fd.seek(0, os.SEEK_END)
self._size = self._fd.tell()
def chunksize(self):
return self._chunksize
def mimetype(self):
return self._mimetype
def size(self):
return self._size
def resumable(self):
return self._resumable
def getbytes(self, begin, length):
self._fd.seek(begin)
return self._fd.read(length)
def has_stream(self):
return True
def stream(self):
return self._fd
def to_json(self):
raise NotImplementedError('MediaIoBaseUpload is not serializable.')
class MediaFileUpload(MediaIoBaseUpload):
@util.positional(2)
def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE,
resumable=False):
self._filename = filename
fd = open(self._filename, 'rb')
if mimetype is None:
(mimetype, encoding) = mimetypes.guess_type(filename)
super(MediaFileUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def to_json(self):
return self._to_json(strip=['_fd'])
@staticmethod
def from_json(s):
d = simplejson.loads(s)
return MediaFileUpload(d['_filename'], mimetype=d['_mimetype'],
chunksize=d['_chunksize'], resumable=d['_resumable'])
class MediaInMemoryUpload(MediaIoBaseUpload):
@util.positional(2)
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
fd = StringIO.StringIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
class MediaIoBaseDownload(object):
@util.positional(3)
def __init__(self, fd, request, chunksize=DEFAULT_CHUNK_SIZE):
self._fd = fd
self._request = request
self._uri = request.uri
self._chunksize = chunksize
self._progress = 0
self._total_size = None
self._done = False
self._sleep = time.sleep
self._rand = random.random
@util.positional(1)
def next_chunk(self, num_retries=0):
headers = {
'range': 'bytes=%d-%d' % (
self._progress, self._progress + self._chunksize)
}
http = self._request.http
for retry_num in xrange(num_retries + 1):
if retry_num > 0:
self._sleep(self._rand() * 2**retry_num)
logging.warning(
'Retry #%d for media download: GET %s, following status: %d'
% (retry_num, self._uri, resp.status))
resp, content = http.request(self._uri, headers=headers)
if resp.status < 500:
break
if resp.status in [200, 206]:
if 'content-location' in resp and resp['content-location'] != self._uri:
self._uri = resp['content-location']
self._progress += len(content)
self._fd.write(content)
if 'content-range' in resp:
content_range = resp['content-range']
length = content_range.rsplit('/', 1)[1]
self._total_size = int(length)
if self._progress == self._total_size:
self._done = True
return MediaDownloadProgress(self._progress, self._total_size), self._done
else:
raise HttpError(resp, content, uri=self._uri)
class _StreamSlice(object):
def __init__(self, stream, begin, chunksize):
self._stream = stream
self._begin = begin
self._chunksize = chunksize
self._stream.seek(begin)
def read(self, n=-1):
cur = self._stream.tell()
end = self._begin + self._chunksize
if n == -1 or cur + n > end:
n = end - cur
return self._stream.read(n)
class HttpRequest(object):
@util.positional(4)
def __init__(self, http, postproc, uri,
method='GET',
body=None,
headers=None,
methodId=None,
resumable=None):
self.uri = uri
self.method = method
self.body = body
self.headers = headers or {}
self.methodId = methodId
self.http = http
self.postproc = postproc
self.resumable = resumable
self.response_callbacks = []
self._in_error_state = False
major, minor, params = mimeparse.parse_mime_type(
headers.get('content-type', 'application/json'))
self.body_size = len(self.body or '')
self.resumable_uri = None
self.resumable_progress = 0
self._rand = random.random
self._sleep = time.sleep
@util.positional(1)
def execute(self, http=None, num_retries=0):
if http is None:
http = self.http
if self.resumable:
body = None
while body is None:
_, body = self.next_chunk(http=http, num_retries=num_retries)
return body
if 'content-length' not in self.headers:
self.headers['content-length'] = str(self.body_size)
if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET':
self.method = 'POST'
self.headers['x-http-method-override'] = 'GET'
self.headers['content-type'] = 'application/x-www-form-urlencoded'
parsed = urlparse.urlparse(self.uri)
self.uri = urlparse.urlunparse(
(parsed.scheme, parsed.netloc, parsed.path, parsed.params, None,
None)
)
self.body = parsed.query
self.headers['content-length'] = str(len(self.body))
for retry_num in xrange(num_retries + 1):
if retry_num > 0:
self._sleep(self._rand() * 2**retry_num)
logging.warning('Retry #%d for request: %s %s, following status: %d'
% (retry_num, self.method, self.uri, resp.status))
resp, content = http.request(str(self.uri), method=str(self.method),
body=self.body, headers=self.headers)
if resp.status < 500:
break
for callback in self.response_callbacks:
callback(resp)
if resp.status >= 300:
raise HttpError(resp, content, uri=self.uri)
return self.postproc(resp, content)
@util.positional(2)
def add_response_callback(self, cb):
self.response_callbacks.append(cb)
@util.positional(1)
def next_chunk(self, http=None, num_retries=0):
if http is None:
http = self.http
if self.resumable.size() is None:
size = '*'
else:
size = str(self.resumable.size())
if self.resumable_uri is None:
start_headers = copy.copy(self.headers)
start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
if size != '*':
start_headers['X-Upload-Content-Length'] = size
start_headers['content-length'] = str(self.body_size)
for retry_num in xrange(num_retries + 1):
if retry_num > 0:
self._sleep(self._rand() * 2**retry_num)
logging.warning(
'Retry #%d for resumable URI request: %s %s, following status: %d'
% (retry_num, self.method, self.uri, resp.status))
resp, content = http.request(self.uri, method=self.method,
body=self.body,
headers=start_headers)
if resp.status < 500:
break
if resp.status == 200 and 'location' in resp:
self.resumable_uri = resp['location']
else:
raise ResumableUploadError(resp, content)
elif self._in_error_state:
headers = {
'Content-Range': 'bytes */%s' % size,
'content-length': '0'
}
resp, content = http.request(self.resumable_uri, 'PUT',
headers=headers)
status, body = self._process_response(resp, content)
if body:
return (status, body)
if self.resumable.has_stream() and sys.version_info[1] >= 6:
data = self.resumable.stream()
if self.resumable.chunksize() == -1:
data.seek(self.resumable_progress)
chunk_end = self.resumable.size() - self.resumable_progress - 1
else:
data = _StreamSlice(data, self.resumable_progress,
self.resumable.chunksize())
chunk_end = min(
self.resumable_progress + self.resumable.chunksize() - 1,
self.resumable.size() - 1)
else:
data = self.resumable.getbytes(
self.resumable_progress, self.resumable.chunksize())
if len(data) < self.resumable.chunksize():
size = str(self.resumable_progress + len(data))
chunk_end = self.resumable_progress + len(data) - 1
headers = {
'Content-Range': 'bytes %d-%d/%s' % (
self.resumable_progress, chunk_end, size),
'Content-Length': str(chunk_end - self.resumable_progress + 1)
}
for retry_num in xrange(num_retries + 1):
if retry_num > 0:
self._sleep(self._rand() * 2**retry_num)
logging.warning(
'Retry #%d for media upload: %s %s, following status: %d'
% (retry_num, self.method, self.uri, resp.status))
try:
resp, content = http.request(self.resumable_uri, method='PUT',
body=data,
headers=headers)
except:
self._in_error_state = True
raise
if resp.status < 500:
break
return self._process_response(resp, content)
def _process_response(self, resp, content):
if resp.status in [200, 201]:
self._in_error_state = False
return None, self.postproc(resp, content)
elif resp.status == 308:
self._in_error_state = False
self.resumable_progress = int(resp['range'].split('-')[1]) + 1
if 'location' in resp:
self.resumable_uri = resp['location']
else:
self._in_error_state = True
raise HttpError(resp, content, uri=self.uri)
return (MediaUploadProgress(self.resumable_progress, self.resumable.size()),
None)
def to_json(self):
d = copy.copy(self.__dict__)
if d['resumable'] is not None:
d['resumable'] = self.resumable.to_json()
del d['http']
del d['postproc']
del d['_sleep']
del d['_rand']
return simplejson.dumps(d)
@staticmethod
def from_json(s, http, postproc):
d = simplejson.loads(s)
if d['resumable'] is not None:
d['resumable'] = MediaUpload.new_from_json(d['resumable'])
return HttpRequest(
http,
postproc,
uri=d['uri'],
method=d['method'],
body=d['body'],
headers=d['headers'],
methodId=d['methodId'],
resumable=d['resumable'])
class BatchHttpRequest(object):
@util.positional(1)
def __init__(self, callback=None, batch_uri=None):
if batch_uri is None:
batch_uri = 'https://www.googleapis.com/batch'
self._batch_uri = batch_uri
self._callback = callback
self._requests = {}
self._callbacks = {}
self._order = []
self._last_auto_id = 0
self._base_id = None
self._responses = {}
self._refreshed_credentials = {}
def _refresh_and_apply_credentials(self, request, http):
creds = None
if request.http is not None and hasattr(request.http.request,
'credentials'):
creds = request.http.request.credentials
elif http is not None and hasattr(http.request, 'credentials'):
creds = http.request.credentials
if creds is not None:
if id(creds) not in self._refreshed_credentials:
creds.refresh(http)
self._refreshed_credentials[id(creds)] = 1
if request.http is None or not hasattr(request.http.request,
'credentials'):
creds.apply(request.headers)
def _id_to_header(self, id_):
if self._base_id is None:
self._base_id = uuid.uuid4()
return '<%s+%s>' % (self._base_id, urllib.quote(id_))
def _header_to_id(self, header):
if header[0] != '<' or header[-1] != '>':
raise BatchError("Invalid value for Content-ID: %s" % header)
if '+' not in header:
raise BatchError("Invalid value for Content-ID: %s" % header)
base, id_ = header[1:-1].rsplit('+', 1)
return urllib.unquote(id_)
def _serialize_request(self, request):
parsed = urlparse.urlparse(request.uri)
request_line = urlparse.urlunparse(
(None, None, parsed.path, parsed.params, parsed.query, None)
)
status_line = request.method + ' ' + request_line + ' HTTP/1.1\n'
major, minor = request.headers.get('content-type', 'application/json').split('/')
msg = MIMENonMultipart(major, minor)
headers = request.headers.copy()
if request.http is not None and hasattr(request.http.request,
'credentials'):
request.http.request.credentials.apply(headers)
if 'content-type' in headers:
del headers['content-type']
for key, value in headers.iteritems():
msg[key] = value
msg['Host'] = parsed.netloc
msg.set_unixfrom(None)
if request.body is not None:
msg.set_payload(request.body)
msg['content-length'] = str(len(request.body))
fp = StringIO.StringIO()
g = Generator(fp, maxheaderlen=0)
g.flatten(msg, unixfrom=False)
body = fp.getvalue()
if request.body is None:
body = body[:-2]
return status_line.encode('utf-8') + body
def _deserialize_response(self, payload):
status_line, payload = payload.split('\n', 1)
protocol, status, reason = status_line.split(' ', 2)
parser = FeedParser()
parser.feed(payload)
msg = parser.close()
msg['status'] = status
resp = httplib2.Response(msg)
resp.reason = reason
resp.version = int(protocol.split('/', 1)[1].replace('.', ''))
content = payload.split('\r\n\r\n', 1)[1]
return resp, content
def _new_id(self):
self._last_auto_id += 1
while str(self._last_auto_id) in self._requests:
self._last_auto_id += 1
return str(self._last_auto_id)
@util.positional(2)
def add(self, request, callback=None, request_id=None):
if request_id is None:
request_id = self._new_id()
if request.resumable is not None:
raise BatchError("Media requests cannot be used in a batch request.")
if request_id in self._requests:
raise KeyError("A request with this ID already exists: %s" % request_id)
self._requests[request_id] = request
self._callbacks[request_id] = callback
self._order.append(request_id)
def _execute(self, http, order, requests):
message = MIMEMultipart('mixed')
setattr(message, '_write_headers', lambda self: None)
for request_id in order:
request = requests[request_id]
msg = MIMENonMultipart('application', 'http')
msg['Content-Transfer-Encoding'] = 'binary'
msg['Content-ID'] = self._id_to_header(request_id)
body = self._serialize_request(request)
msg.set_payload(body)
message.attach(msg)
body = message.as_string()
headers = {}
headers['content-type'] = ('multipart/mixed; '
'boundary="%s"') % message.get_boundary()
resp, content = http.request(self._batch_uri, method='POST', body=body,
headers=headers)
if resp.status >= 300:
raise HttpError(resp, content, uri=self._batch_uri)
boundary, _ = content.split(None, 1)
header = 'content-type: %s\r\n\r\n' % resp['content-type']
for_parser = header + content
parser = FeedParser()
parser.feed(for_parser)
mime_response = parser.close()
if not mime_response.is_multipart():
raise BatchError("Response not in multipart/mixed format.", resp=resp,
content=content)
for part in mime_response.get_payload():
request_id = self._header_to_id(part['Content-ID'])
response, content = self._deserialize_response(part.get_payload())
self._responses[request_id] = (response, content)
@util.positional(1)
def execute(self, http=None):
if http is None:
for request_id in self._order:
request = self._requests[request_id]
if request is not None:
http = request.http
break
if http is None:
raise ValueError("Missing a valid http object.")
self._execute(http, self._order, self._requests)
redo_requests = {}
redo_order = []
for request_id in self._order:
resp, content = self._responses[request_id]
if resp['status'] == '401':
redo_order.append(request_id)
request = self._requests[request_id]
self._refresh_and_apply_credentials(request, http)
redo_requests[request_id] = request
if redo_requests:
self._execute(http, redo_order, redo_requests)
for request_id in self._order:
resp, content = self._responses[request_id]
request = self._requests[request_id]
callback = self._callbacks[request_id]
response = None
exception = None
try:
if resp.status >= 300:
raise HttpError(resp, content, uri=request.uri)
response = request.postproc(resp, content)
except HttpError, e:
exception = e
if callback is not None:
callback(request_id, response, exception)
if self._callback is not None:
self._callback(request_id, response, exception)
class HttpRequestMock(object):
def __init__(self, resp, content, postproc):
self.resp = resp
self.content = content
self.postproc = postproc
if resp is None:
self.resp = httplib2.Response({'status': 200, 'reason': 'OK'})
if 'reason' in self.resp:
self.resp.reason = self.resp['reason']
def execute(self, http=None):
return self.postproc(self.resp, self.content)
class RequestMockBuilder(object):
def __init__(self, responses, check_unexpected=False):
self.responses = responses
self.check_unexpected = check_unexpected
def __call__(self, http, postproc, uri, method='GET', body=None,
headers=None, methodId=None, resumable=None):
if methodId in self.responses:
response = self.responses[methodId]
resp, content = response[:2]
if len(response) > 2:
expected_body = response[2]
if bool(expected_body) != bool(body):
raise UnexpectedBodyError(expected_body, body)
if isinstance(expected_body, str):
expected_body = simplejson.loads(expected_body)
body = simplejson.loads(body)
if body != expected_body:
raise UnexpectedBodyError(expected_body, body)
return HttpRequestMock(resp, content, postproc)
elif self.check_unexpected:
raise UnexpectedMethodError(methodId=methodId)
else:
model = JsonModel(False)
return HttpRequestMock(None, '{}', model.response)
class HttpMock(object):
def __init__(self, filename=None, headers=None):
if headers is None:
headers = {'status': '200 OK'}
if filename:
f = file(filename, 'r')
self.data = f.read()
f.close()
else:
self.data = None
self.response_headers = headers
self.headers = None
self.uri = None
self.method = None
self.body = None
self.headers = None
def request(self, uri,
method='GET',
body=None,
headers=None,
redirections=1,
connection_type=None):
self.uri = uri
self.method = method
self.body = body
self.headers = headers
return httplib2.Response(self.response_headers), self.data
class HttpMockSequence(object):
def __init__(self, iterable):
self._iterable = iterable
self.follow_redirects = True
def request(self, uri,
method='GET',
body=None,
headers=None,
redirections=1,
connection_type=None):
resp, content = self._iterable.pop(0)
if content == 'echo_request_headers':
content = headers
elif content == 'echo_request_headers_as_json':
content = simplejson.dumps(headers)
elif content == 'echo_request_body':
if hasattr(body, 'read'):
content = body.read()
else:
content = body
elif content == 'echo_request_uri':
content = uri
return httplib2.Response(resp), content
def set_user_agent(http, user_agent):
request_orig = http.request
def new_request(uri, method='GET', body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if headers is None:
headers = {}
if 'user-agent' in headers:
headers['user-agent'] = user_agent + ' ' + headers['user-agent']
else:
headers['user-agent'] = user_agent
resp, content = request_orig(uri, method, body, headers,
redirections, connection_type)
return resp, content
http.request = new_request
return http
def tunnel_patch(http):
request_orig = http.request
def new_request(uri, method='GET', body=None, headers=None,
redirections=httplib2.DEFAULT_MAX_REDIRECTS,
connection_type=None):
if headers is None:
headers = {}
if method == 'PATCH':
if 'oauth_token' in headers.get('authorization', ''):
logging.warning(
'OAuth 1.0 request made with Credentials after tunnel_patch.')
headers['x-http-method-override'] = "PATCH"
method = 'POST'
resp, content = request_orig(uri, method, body, headers,
redirections, connection_type)
return resp, content
http.request = new_request
return http