import base64
import logging
import json
import os
import boto3
import rapidjson
def parse_schema_name(path):
elements = path.split("/")
doctype, docver = elements[-1].split(".")[:-2]
namespace = elements[-3]
return f"{namespace}.{doctype}.{docver}"
def load_schemas(path):
schemas = {}
for root, _, files in os.walk(path):
for name in files:
if name.endswith(".schema.json"):
schemafile = os.path.join(root, name)
name = parse_schema_name(schemafile)
with open(schemafile, "r") as f:
schemas[name] = rapidjson.Validator(f.read())
return schemas
def get_schema_name(key):
params = dict([x.split("=") for x in key.split("/") if "=" in x])
return ".".join(map(params.get, ["namespace", "doc_type", "doc_version"]))
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
root = os.path.join(os.path.dirname(__file__), "..")
os.chdir(root)
schemas = load_schemas("schemas")
output_folder = "data"
if not os.path.exists(output_folder):
os.mkdir(output_folder)
bucket = "telemetry-parquet"
prefix = "sanitized-landfill-sample/v3/submission_date_s3=20190310"
s3 = boto3.client("s3")
objs = s3.list_objects(Bucket=bucket, Prefix=prefix)
keys = [obj["Key"] for obj in objs["Contents"] if obj["Key"].endswith(".json")]
for key in keys:
schema_name = get_schema_name(key)
if not schema_name in schemas:
logging.info(f"schema does not exist: {schema_name}")
continue
data = (
s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8").strip()
)
lines = data.split("\n")
with open(f"{output_folder}/{schema_name}.ndjson", "w") as fp:
errors = 0
for line in lines:
content = json.loads(line).get("content")
try:
schemas[schema_name](content)
except ValueError:
errors += 1
continue
fp.write(json.dumps(json.loads(content)) + "\n")
logging.info(
f"wrote {len(lines)-errors}, skipped {errors} documents: {schema_name}"
)
logging.info("Done!")