import os
import asyncio
import json
from urllib.parse import urlparse
import sys
import webview
import websockets
import re
import argparse
import ast
import webview.menu
do_exit = None
def make_filters(filters):
if not filters:
return tuple()
filters_list = []
for k, f in filters.items():
filter_string = '{} ({})'.format(k, ';'.join(f))
filters_list.append(filter_string)
return tuple(filters_list)
def open_file_dialog(window, params, ex_id):
dir_name = params['dir']
filters = params['filters']
result = window.create_file_dialog(webview.OPEN_DIALOG,
directory=dir_name,
allow_multiple=False,
file_types=make_filters(filters))
response = json.dumps({
'type': 'extension_response',
'extension_call': 'openFileResponse',
'extension_id': ex_id,
'openFileResponse': str(result[0]) if result else ''})
return response
def open_files_dialog(window, params, ex_id):
dir_name = params['dir']
filters = params['filters']
result = window.create_file_dialog(webview.OPEN_DIALOG,
directory=dir_name,
allow_multiple=True,
file_types=make_filters(filters))
response = json.dumps({
'type': 'extension_response',
'extension_call': 'openFilesResponse',
'extension_id': ex_id,
'openFilesResponse': list(result) if result else []})
return response
def open_dir_dialog(window, params, ex_id):
dir_name = params['dir']
result = window.create_file_dialog(webview.FOLDER_DIALOG,
directory=dir_name,
allow_multiple=False)
response = json.dumps({
'type': 'extension_response',
'extension_call': 'openDirResponse',
'extension_id': ex_id,
'openDirResponse': str(result[0]) if result else ''})
return response
def save_file_dialog(window, params, ex_id):
dir_name = params['dir']
filters = params['filters']
result = window.create_file_dialog(webview.SAVE_DIALOG,
directory=dir_name,
allow_multiple=False,
file_types=make_filters(filters))
response = json.dumps({
'type': 'extension_response',
'extension_call': 'saveFileResponse',
'extension_id': ex_id,
'saveFileResponse': str(''.join(result)) if result else ''})
return response
def menu_call(menu_id):
async def do_call():
response = json.dumps({
'type': 'event',
'event': 'menu_event',
'element': 'app menu', 'properties': {'menu_id': menu_id}})
await extender_socket.send(response)
asyncio.run(do_call())
def test_type(var_, type_):
if not isinstance(var_, type_):
raise 'Bad variable' + type(var_)
def create_menu(menu_def):
menu = []
for menu_item in menu_def:
test_type(menu_item, dict)
type = menu_item['type']
if type == 'separator':
menu.append(webview.menu.MenuSeparator())
elif type == 'sub_menu':
title = menu_item['title']
test_type(title, str)
sub_menu = menu_item['sub_menu']
test_type(sub_menu, list)
sub_menu_list = create_menu(sub_menu)
menu.append(webview.menu.Menu(title, sub_menu_list))
elif type == 'action':
title = menu_item['title']
test_type(title, str)
action_id = menu_item['action_id']
test_type(action_id, str)
menu.append(webview.menu.MenuAction(
title,
lambda: menu_call(action_id)
))
else:
raise("Bad menu type ", type)
return menu
def add_menu(menu_def):
try:
menu = json.loads(menu_def)
return create_menu(menu)
except UnicodeDecodeError as e:
print('UnicodeDecodeError on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)
except json.decoder.JSONDecodeError as e:
print('JSONDecodeError on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)
except Exception as e:
print('Error on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)
return []
def resize(window, params):
vp_height = window.evaluate_js(r'Math.min(window.innerHeight, document.documentElement.clientHeight);')
vp_width = window.evaluate_js(r'Math.min(window.innerWidth, document.documentElement.clientWidth);')
border_height = window.height - vp_height
border_width = window.width - vp_width
width = params['width']
height = params['height']
window.resize(width + border_width, height + border_height)
def set_title(window, params):
title = params['title']
window.set_title(title)
def on_show(window, host, port):
ws_uri = 'ws://{}:{}/gemgui/extension'.format(host, port)
window_destroyed = False
async def extender():
async with websockets.connect(ws_uri,
close_timeout=None,
ping_interval=None,
compression=None) as ws:
global extender_socket
extender_socket = ws
nonlocal window_destroyed
loop = asyncio.get_event_loop()
def destroy_window():
if not window_destroyed:
window.minimize() window.destroy()
return
def exit_f():
pass
global do_exit
do_exit = exit_f
try:
await ws.send(json.dumps({'type': 'extensionready'}))
except Exception as e:
print(f"Initial send failed {e}")
return
while True:
receive = loop.create_task(ws.recv())
try:
await receive
except asyncio.CancelledError:
destroy_window()
await ws.close()
return
except websockets.ConnectionClosedError as e:
if not window_destroyed:
print(f"Connection closed: {ws_uri} due {e}")
destroy_window()
return
doc = receive.result()
if(not isinstance(doc, str)):
continue
try:
obj = json.loads(doc)
except UnicodeDecodeError as e:
print('UnicodeDecodeError on extender:', e, '\nWhen parsing:', doc[:1000], file=sys.stderr)
return
except json.decoder.JSONDecodeError as e:
if(doc == "entered"):
continue print('JSONDecodeError on extender:', e, '\nWhen parsing:', doc[:1000], file=sys.stderr)
return
if not type(obj) is dict:
print('Invalid JS object', doc[:1000])
continue
if obj['type'] == 'exit_request' or obj['type'] == 'close_request':
window_destroyed = True
window.destroy()
continue
if obj['type'] != 'extension':
continue
call_id = obj['extension_call']
params = obj['extension_params']
response = None
if call_id == 'setAppIcon':
pass
if call_id == 'resize':
resize(window, params)
if call_id == 'setTitle':
set_title(window, params)
if call_id == 'ui_info':
pass
ex_id = obj['extension_id']
if call_id == 'openFile':
response = open_file_dialog(window, params, ex_id)
if call_id == 'openFiles':
response = open_files_dialog(window, params, ex_id)
if call_id == 'saveFile':
response = save_file_dialog(window, params, ex_id)
if call_id == 'openDir':
response = open_dir_dialog(window, params, ex_id)
if response:
await ws.send(response)
asyncio.run(extender())
def on_close():
if do_exit:
do_exit()
os._exit(0)
def main():
width = 1024
height = 768
title = ''
extra = {}
NORESIZE = 0x1
FULLSCREEN = 0x2
HIDDEN = 0x4
FRAMELESS = 0x8
MINIMIZED = 0x10
ONTOP = 0x20
CONFIRMCLOSE = 0x40
TEXTSELECT = 0x80
EASYDRAG = 0x100
TRANSPARENT = 0x200
flags = 0
parser = argparse.ArgumentParser()
parser.add_argument('--gempyre-url', type=str)
parser.add_argument('--gempyre-width', type=int)
parser.add_argument('--gempyre-height', type=int)
parser.add_argument('--gempyre-title', type=str)
parser.add_argument('--gempyre-extra', type=str)
parser.add_argument('--gempyre-flags', type=int)
parser.add_argument('-c', action='store_true') parser.add_argument('--gempyre-menu', type=str)
try:
args = parser.parse_args()
except argparse.ArgumentError:
pass
if args.gempyre_width:
width = int(args.gempyre_width)
if args.gempyre_height:
height = int(args.gempyre_height)
if args.gempyre_title:
title = args.gempyre_title
if args.gempyre_url:
uri_string = args.gempyre_url
elif args.url:
uri_string = args.url
if args.gempyre_flags:
flags = args.gempyre_flags
menu = []
if args.gempyre_menu:
menu = add_menu(args.gempyre_menu)
if sys.platform == 'win32':
extra['gui'] = 'cef'
if args.gempyre_extra:
for e in args.gempyre_extra.split(';'):
m = re.match(r'^\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(.*)\s*$', e)
ex_name = m[1]
e_lit = m[2]
try:
ex_value = ast.literal_eval(e_lit)
extra[ex_name] = m[ex_value]
except ValueError as err_s:
print("Invalid parameter in", e_lit, "of", e, ": ", err_s)
uri = urlparse(uri_string)
window = webview.create_window(title, url=uri_string, width=width, height=height,
resizable = True if not flags & NORESIZE else False,
fullscreen = True if flags & FULLSCREEN else False,
hidden = True if flags & HIDDEN else False,
frameless = True if flags & FRAMELESS else False,
minimized = True if flags & MINIMIZED else False,
on_top = True if flags & ONTOP else False,
confirm_close = True if flags & CONFIRMCLOSE else False,
text_select = True if flags & TEXTSELECT else False,
easy_drag = True if flags & EASYDRAG else False,
transparent = True if flags & TRANSPARENT else False)
if hasattr(window, 'events'): window.events.shown += lambda: on_show(window, uri.hostname, uri.port)
window.events.closing += on_close
else:
window.shown += lambda: on_show(window, uri.hostname, uri.port)
window.closing += on_close
webview.start(menu=menu, **extra)
if __name__ == '__main__':
main()