gemgui 0.5.1

GUI application library
Documentation
import os
import asyncio
import json
from urllib.parse import urlparse
import sys
import webview
import websockets
import re
import argparse
import ast
import webview.menu

'''
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
'''

do_exit = None

# file_types = ('Image Files (*.bmp;*.jpg;*.gif)', 'All files (*.*)')
def make_filters(filters):
    if not filters:
        return tuple()
    filters_list = []
    for k, f in filters.items():
        filter_string = '{} ({})'.format(k, ';'.join(f))
        filters_list.append(filter_string)
    return tuple(filters_list)

def open_file_dialog(window, params, ex_id):
    dir_name = params['dir']
    filters = params['filters']

    result = window.create_file_dialog(webview.OPEN_DIALOG,
                                        directory=dir_name,
                                        allow_multiple=False,
                                        file_types=make_filters(filters))
    response = json.dumps({
        'type': 'extension_response',
        'extension_call': 'openFileResponse',
        'extension_id': ex_id,
        'openFileResponse': str(result[0]) if result else ''})
    return response
        

def open_files_dialog(window, params, ex_id):
    dir_name = params['dir']
    filters = params['filters']
    result = window.create_file_dialog(webview.OPEN_DIALOG,
                                        directory=dir_name,
                                        allow_multiple=True,
                                        file_types=make_filters(filters))
    response = json.dumps({
        'type': 'extension_response',
        'extension_call': 'openFilesResponse',
        'extension_id': ex_id,
        'openFilesResponse': list(result) if result else []})
    return response
        

def open_dir_dialog(window, params, ex_id): 
    dir_name = params['dir']
    result = window.create_file_dialog(webview.FOLDER_DIALOG,
                                        directory=dir_name,
                                        allow_multiple=False)
    response = json.dumps({
        'type': 'extension_response',
        'extension_call': 'openDirResponse',
        'extension_id': ex_id,
        'openDirResponse': str(result[0]) if result else ''})
    return response


def save_file_dialog(window, params, ex_id):
    dir_name = params['dir']
    filters = params['filters']
    result = window.create_file_dialog(webview.SAVE_DIALOG,
                                        directory=dir_name,
                                        allow_multiple=False,
                                        file_types=make_filters(filters))
    response = json.dumps({
        'type': 'extension_response',
        'extension_call': 'saveFileResponse',
        'extension_id': ex_id,
        'saveFileResponse': str(''.join(result)) if result else ''})
    return response


def menu_call(menu_id):
    async def do_call():
        response = json.dumps({
            'type': 'event',
            'event': 'menu_event',
            'element': 'app menu', # this should be illegal HTML id
            'properties': {'menu_id': menu_id}})
        await extender_socket.send(response)
    
    asyncio.run(do_call())
        

def test_type(var_, type_):
    if not isinstance(var_, type_):
        raise 'Bad variable' + type(var_) 


def create_menu(menu_def):
    menu = []
    for menu_item in menu_def:
        test_type(menu_item, dict)
        type = menu_item['type']
        if type == 'separator':
            menu.append(webview.menu.MenuSeparator())
        elif type == 'sub_menu':
            title = menu_item['title']
            test_type(title, str)
            sub_menu = menu_item['sub_menu']
            test_type(sub_menu, list)
            sub_menu_list = create_menu(sub_menu)
            menu.append(webview.menu.Menu(title, sub_menu_list))
        elif type == 'action':
            title = menu_item['title']
            test_type(title, str)
            action_id = menu_item['action_id']
            test_type(action_id, str)
            menu.append(webview.menu.MenuAction(
                title,
                lambda: menu_call(action_id)
            ))
        else:
            raise("Bad menu type ", type)
    return menu    


def add_menu(menu_def):
    try:
        menu = json.loads(menu_def)
        return create_menu(menu)
    except UnicodeDecodeError as e:
        print('UnicodeDecodeError on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)
    except json.decoder.JSONDecodeError as e:
        print('JSONDecodeError on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)
    except Exception as e:
        print('Error on menu:', e, '\nWhen parsing:', menu_def, file=sys.stderr)    
    return []       
    

def resize(window, params):
     # window.resize include titlebar, so we get current body and get title height, so we can add it to get requested body size
    # known issue, does not work well with FRAMELESS. Fix someday.
    vp_height = window.evaluate_js(r'Math.min(window.innerHeight, document.documentElement.clientHeight);')
    vp_width = window.evaluate_js(r'Math.min(window.innerWidth, document.documentElement.clientWidth);')
    border_height = window.height - vp_height
    border_width = window.width - vp_width
    width = params['width']
    height = params['height']
    window.resize(width + border_width, height + border_height)


def set_title(window, params):
    title = params['title']
    window.set_title(title)

def on_show(window, host, port):
    ws_uri = 'ws://{}:{}/gemgui/extension'.format(host, port)
    window_destroyed = False

    async def extender():    
        async with websockets.connect(ws_uri,
                                    close_timeout=None,
                                    ping_interval=None,
                                    compression=None) as ws:
            global extender_socket
            extender_socket = ws
            nonlocal window_destroyed
            loop = asyncio.get_event_loop()
            #receive = loop.create_task(ws.recv())
            
            def destroy_window():
                if not window_destroyed:
                    window.minimize()  # it takes some time
                    window.destroy()
                return
            
            

            def exit_f():
                pass
                ##loop.run_until_complete(ws.close)
                #nonlocal receive
                #receive.cancel()

            global do_exit
            do_exit = exit_f

            try:
                await ws.send(json.dumps({'type': 'extensionready'}))
            except Exception as e:
                print(f"Initial send failed {e}") 
                return   

            while True:
                receive = loop.create_task(ws.recv())
                try:
                    await receive
                except asyncio.CancelledError:
                    destroy_window()
                    await ws.close()
                    return
                except websockets.ConnectionClosedError as e:
                    if not window_destroyed:
                        print(f"Connection closed: {ws_uri} due {e}")
                        destroy_window()
                    return

                doc = receive.result()

                if(not isinstance(doc, str)):
                    continue

                
                try:
                    obj = json.loads(doc)
                except UnicodeDecodeError as e:
                    print('UnicodeDecodeError on extender:', e, '\nWhen parsing:', doc[:1000], file=sys.stderr)
                    return
                except json.decoder.JSONDecodeError as e:
                    if(doc == "entered"): 
                        continue # gemgui internal 
                    print('JSONDecodeError on extender:', e, '\nWhen parsing:', doc[:1000], file=sys.stderr)
                    return 
                
                if not type(obj) is dict:
                    print('Invalid JS object', doc[:1000])
                    continue

                if obj['type'] == 'exit_request' or obj['type'] == 'close_request':
                    window_destroyed = True
                    window.destroy()
                    #ws.close()
                    # loop.stop()
                    #return
                    continue

                if obj['type'] != 'extension':
                    continue

                call_id = obj['extension_call']
                params = obj['extension_params']
               
                response = None
                
                if call_id == 'setAppIcon':
                    pass
                
                if call_id == 'resize':
                    resize(window, params)
                   
                if call_id == 'setTitle':
                   set_title(window, params)

                if call_id == 'ui_info':
                    pass

                ex_id = obj['extension_id']    
                if call_id == 'openFile':
                    response = open_file_dialog(window, params, ex_id)

                if call_id == 'openFiles':
                    response = open_files_dialog(window, params, ex_id)

                if call_id == 'saveFile':
                    response = save_file_dialog(window, params, ex_id)

                if call_id == 'openDir':
                    response = open_dir_dialog(window, params, ex_id)            

                if response:
                    await ws.send(response)

    asyncio.run(extender())

def on_close():
    if do_exit:
        do_exit()
    os._exit(0) # pyvwebview is very slow to close sockets.



def main():
    width = 1024
    height = 768
    title = ''
    extra = {}

    NORESIZE = 0x1
    FULLSCREEN = 0x2
    HIDDEN = 0x4
    FRAMELESS = 0x8
    MINIMIZED = 0x10
    ONTOP = 0x20
    CONFIRMCLOSE = 0x40
    TEXTSELECT = 0x80
    EASYDRAG = 0x100
    TRANSPARENT = 0x200

    flags = 0

    parser = argparse.ArgumentParser()
    parser.add_argument('--gempyre-url', type=str)
    parser.add_argument('--gempyre-width', type=int)
    parser.add_argument('--gempyre-height', type=int)
    parser.add_argument('--gempyre-title', type=str)
    parser.add_argument('--gempyre-extra', type=str)
    parser.add_argument('--gempyre-flags', type=int)
   # parser.add_argument('url', type=str)
    parser.add_argument('-c', action='store_true') # clean off
    parser.add_argument('--gempyre-menu', type=str)

    try:
        args = parser.parse_args()
    except argparse.ArgumentError:
        pass

    if args.gempyre_width:
        width = int(args.gempyre_width)

    if args.gempyre_height:
        height = int(args.gempyre_height)

    if args.gempyre_title:
        title = args.gempyre_title

    if args.gempyre_url:
        uri_string = args.gempyre_url
    elif args.url:
        uri_string = args.url

    if args.gempyre_flags:
        flags = args.gempyre_flags

    menu = []    

    if args.gempyre_menu:
        menu = add_menu(args.gempyre_menu)


    if sys.platform == 'win32':
        extra['gui'] = 'cef'

    if args.gempyre_extra:
        for e in args.gempyre_extra.split(';'):
            m = re.match(r'^\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(.*)\s*$', e)
            ex_name = m[1]
            e_lit = m[2]
            try:
                ex_value = ast.literal_eval(e_lit)
                extra[ex_name] = m[ex_value]
            except ValueError as err_s:
                print("Invalid parameter in", e_lit, "of", e, ": ", err_s)

    uri = urlparse(uri_string)
    window = webview.create_window(title, url=uri_string, width=width, height=height,
    resizable = True if not flags & NORESIZE else False,
    fullscreen = True if flags & FULLSCREEN else False,
    hidden = True if flags & HIDDEN else False,
    frameless = True if flags & FRAMELESS else False,
    minimized = True if flags & MINIMIZED else False,
    on_top = True if flags & ONTOP else False,
    confirm_close = True if flags & CONFIRMCLOSE else False,
    text_select = True if flags & TEXTSELECT else False,
    easy_drag = True if flags & EASYDRAG else False,
    transparent = True if flags & TRANSPARENT else False)
    if hasattr(window, 'events'): # version compliancy
        window.events.shown += lambda: on_show(window, uri.hostname, uri.port)
        window.events.closing += on_close
    else:
        window.shown += lambda: on_show(window, uri.hostname, uri.port)
        window.closing += on_close
    webview.start(menu=menu, **extra)

    
if __name__ == '__main__':
    main()