import unittest
import webtest
import apprtc
import compute_page
from test_util import CapturingFunction
from test_util import ReplaceFunction
from google.appengine.ext import testbed
class FakeComputeService(object):
def __init__(self):
self.instances = CapturingFunction(lambda: self.instances)
self.instances.start = CapturingFunction(lambda: self.instances.start)
self.instances.start.execute = CapturingFunction()
self.instances.stop = CapturingFunction(lambda: self.instances.stop)
self.instances.stop.execute = CapturingFunction()
self.get_return_value = None
self.instances.get = CapturingFunction(lambda: self.instances.get)
self.instances.get.execute = CapturingFunction(
lambda: self.get_return_value)
class ComputePageHandlerTest(unittest.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub()
self.test_app = webtest.TestApp(apprtc.app)
self.build_compute_service_replacement = ReplaceFunction(
compute_page.ComputePage,
'_build_compute_service',
self.fake_build_compute_service)
self.compute_service = FakeComputeService()
self.app_id = 'testbed-test'
def fake_build_compute_service(self, *args):
return self.compute_service
def tearDown(self):
self.testbed.deactivate()
del self.build_compute_service_replacement
def makePostRequest(self, path, body=''):
return self.test_app.post(path, body, headers={'User-Agent': 'Safari'})
def makeGetRequest(self, path):
return self.test_app.get(path, headers={'User-Agent': 'Safari'})
def testGetStatus(self):
instance = 'test-instance'
zone = 'test-zone'
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
response = self.makeGetRequest('/compute/%s/%s/%s' %
(compute_page.ACTION_STATUS, instance, zone))
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.get.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
self.assertEqual(instance_status, response.body)
def testPostStartWhenTerminated(self):
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
instance_status = compute_page.COMPUTE_STATUS_TERMINATED
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(1, len(tasks))
self.assertEqual(1, self.compute_service.instances.start.num_calls)
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.start.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
def testPostStartWhenRunning(self):
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(0, len(tasks))
self.assertEqual(0, self.compute_service.instances.start.num_calls)
def testPostStartNotRunningOrTerminated(self):
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
for instance_status in ['PROVISIONING', 'STAGING', 'STOPPING']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
self.assertEqual(start_url, task['url'])
self.assertEqual(0, self.compute_service.instances.start.num_calls)
taskqueue.FlushQueue('default')
def testPostRestartWhenRunning(self):
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: 'RUNNING'
}
self.makePostRequest(restart_url)
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
self.assertEqual(start_url, task['url'])
self.assertEqual(1, self.compute_service.instances.stop.num_calls)
def testPostRestartWhenNotRunning(self):
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
for status in ['PROVISIONING', 'STAGING', 'STOPPING', 'TERMINATED']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: status
}
self.makePostRequest(restart_url)
tasks = taskqueue.GetTasks('default')
self.assertEqual(0, len(tasks))
self.assertEqual(0, self.compute_service.instances.stop.num_calls)