import os
import re
import logging
import json
from time import sleep
from pathlib import Path
from itertools import count, chain
from mwclient import Site
API_TOKEN = os.getenv("MW_API_TOKEN")
SLEEP = 0
CGROUPS_DIR = Path(__file__).parent / "cgroups"
logger = logging.getLogger(__name__)
REGEX_MODULE_NAME = re.compile(r"""name\s*=\s*(['"])(?P<name>.+?)(\1)""")
REGEX_MODULE_DESCRIPTION = re.compile(r"""description\s*=\s*(['"])(?P<desc>.*?)(\1)""")
REGEX_TEMPLATE_HEADER = re.compile(
r"""{{\s*CGroupH\s*\|\s*name\s*=\s*(?P<name>[^|]+)\s*\|\s*desc\s*=\s*(?P<desc>.*?)\s*}}"""
)
REGEX_LANG = re.compile(r"\{\{lang\|[a-zA-z]{2}\|([^}]+)}}")
REGEX_RULE5 = re.compile(
r"""{{\s*(CI(tem(Hidden)?)?|CNoteA)\s*\|(\s*desc\s*=\s*[^|]*\s*\|)?\s*original\s*=\s*(?P<original>[^|]*?)\s*(\|\s*desc\s*=\s*[^|]*\s*)?\|\s*(1=)?\s*(?P<conv>[^}]+)(\|\s*desc\s*=\s*[^|]*\s*)?(\|\s*)?}}"""
)
REGEX_RULE6 = re.compile(
r"""{{\s*(CI(tem(Hidden)?)?|CNoteA)\s*(\|\s*desc\s*=\s*[^|]*\s*)?\|\s*(1=)?\s*(?P<conv>[^|]+)\s*(\|\s*desc\s*=\s*[^|]*\s*)?(\|\s*original\s*=\s*(?P<original>.*?))?\s*(\|\s*desc\s*=\s*[^|]*\s*)?(\|\s*)?}}"""
)
REGEX_RULE7 = re.compile(
r"""{{\s*CItemLan\s*\|\s*([12]=)?\s*(?P<conv>[^|]+)\s*(\|\s*([12]=)?\s*(?P<original>.*?))?\s*(\|\s*)?}}"""
)
REGEX_RULE8 = re.compile(
r"""{{\s*CItemLan\/R\s*\|\s*([12]=)?\s*(?P<original>[^|]+)\|\s*([12]=)?\s*(?P<conv>.*?)\s*(\|\s*)?}}"""
)
REGEX_SPECIAL_CHAR_NOTICE = re.compile(r"""<span[^>]*>([^>]+)<\/span>""")
def cgroup_modules(site):
for entry in site.search("Module:CGroup/"):
title = entry["title"]
if title in {
"Module:CGroup/core",
"Module:CGroup/preload",
"Module:CGroup/sandbox",
"Module:CGroup/Sandbox",
}:
continue if not title.startswith("Module:CGroup/"):
continue if "/" in title.removeprefix("Module:CGroup/"):
logger.info(f"Skip {title} (sub page)")
continue
if entry["size"] < 66 or "return require(" in entry["snippet"]:
logger.info(f"Skip {title} (too small or redirecting)")
continue
yield title
def cgroup_templates(site):
for entry in site.search("Template:CGroup"):
title = entry["title"]
if title in {
"Template:CGroup/doc",
"Template:CGroup/list",
"Template:CGroup/preload",
"Template:CGroup/sandbox",
"Template:CGroup/CHead",
"Template:CGroup/editintro",
"Template:CGroup/New Style",
}:
continue if not title.startswith("Template:CGroup/"):
continue
if "/" in title.removeprefix("Template:CGroup/"):
logger.info(f"Skip {title} (sub page)")
continue
if (
entry["size"] < 66
or "#重定向" in entry["snippet"]
or "#REDIRECT" in entry["snippet"]
):
logger.info(f"Skip {title} (too small or redirecting)")
continue
yield title
def parse_module_line(s):
def parse_lua_args(s):
def r(*args, **kwargs):
return (args, kwargs)
try:
return eval(f"r({s})", {"__builtins__": {}, "r": r, "nil": None})
except (SyntaxError, NameError):
return None
s = s.strip().rstrip(",")
if not s.startswith("--"): if s.startswith("{") and s.endswith("}"):
s = s[1:-1]
if args := parse_lua_args(s):
args = args[1] if "original" in args or "rule" in args: return (args.get("original", ""), args.get("rule", ""))
elif s.startswith("Item(") and s.endswith(")"):
s = s[5:-1]
if (args := parse_lua_args(s)) and len(args[0]) >= 2:
return args[0] return None
def parse_template_line(s):
return (
m := (
REGEX_RULE5.search(s)
or REGEX_RULE6.search(s)
or REGEX_RULE7.search(s)
or REGEX_RULE8.search(s)
)
) and (m.group("original"), m.group("conv"))
def main():
logging.basicConfig(level=os.environ.get("LOGLEVEL") or logging.INFO)
logger.info("Up and running...")
logger.info("Do not forget to clear existing cgroups if necessary.")
if not os.path.exists("./cgroups"):
os.mkdir(CGROUPS_DIR)
auth_headers = {"Authorization": f"Bearer {API_TOKEN}"} if API_TOKEN else {}
site = Site(
"zh.wikipedia.org",
clients_useragent="zhconvs-rs/0.0",
custom_headers=auth_headers,
)
emptys = []
fails = []
for nth, title in zip(
count(1), chain(cgroup_modules(site), cgroup_templates(site))
):
logger.info(f"Processing no.{nth} {title}")
try:
pname = (title or "").split("/")[-1].strip()
if not pname:
logger.warning(f"Path invalid: {title}")
fails.append(title)
continue
if (CGROUPS_DIR / f"{pname}.json").exists():
logger.info(f"Skip {title} (file already exists)")
continue
sleep(SLEEP)
page = site.pages[title]
text = REGEX_LANG.sub(r"\1", page.text())
if (
title.startswith("Module:")
and (mname := REGEX_MODULE_NAME.search(text))
and (mdesc := REGEX_MODULE_DESCRIPTION.search(text))
):
name = str(mname.group("name"))
description = str(mdesc.group("desc"))
elif mheader := REGEX_TEMPLATE_HEADER.search(text):
name = mheader.group("name")
description = mheader.group("desc")
else:
fails.append(title)
logger.warning("Failed to parse " + title)
continue
rules = []
if title.startswith("Module:"):
match_rule = parse_module_line
else: match_rule = parse_template_line
for mrule in filter(
None,
map(
match_rule,
text.split("\n"),
),
):
conv = REGEX_SPECIAL_CHAR_NOTICE.sub(
r"\1",
mrule[1].replace("{{=}}", "="), )
rules.append(
{
"original": mrule[0],
"conv": conv,
}
)
cgroup = {
"name": name,
"description": description,
"path": title,
"rules": rules,
}
with open(CGROUPS_DIR / f"{pname}.json", "w") as f:
f.write(json.dumps(cgroup, indent=2, ensure_ascii=False))
if len(rules) == 0:
logger.warning(f"0 rules found in {title}")
emptys.append(title)
except KeyError:
fails.append(title)
logger.warning(f"Skip {title} (missing metadata)")
except Exception:
fails.append(title)
logger.exception(f"Error when processing {title}")
logger.info(
f"{nth} group(s) successfully fetched with {len(emptys)} empty, {len(fails)} failed."
)
if emptys:
logger.info(f"empty: " + ", ".join(emptys))
if fails:
logger.info(f"fail: " + ", ".join(fails))
if __name__ == "__main__":
main()