geoipsed 0.2.2

Inline decoration of IPv4 and IPv6 address geolocations
Documentation
# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "geoip2>=5.2.0",
#     "maxminddb>=3.0.0",
# ]
# ///
import re
import sys
from functools import lru_cache

from geoip2.database import Reader
from geoip2.errors import AddressNotFoundError
from maxminddb import MODE_MMAP_EXT, InvalidDatabaseError

# ipv4 - copied from cyberchef.org minus the cidr mask
# ipv6 - https://gist.github.com/dfee/6ed3a4b05cfe7a6faf40a2102408d5d8
IPRE = re.compile(
    r"""
    (
        (?:(?:\d|[01]?\d\d|2[0-4]\d|25[0-5])\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d|\d)
    )
    |
    (
        (?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}:[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:::(?:ffff(?::0{1,4}){0,1}:){0,1}[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:fe80:(?::(?:(?:[0-9a-fA-F]){1,4})){0,4}%[0-9a-zA-Z]{1,})|(?::(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,7}|:))|(?:(?:(?:[0-9a-fA-F]){1,4}):(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,6}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,2}(?::(?:(?:[0-9a-fA-F]){1,4})){1,5})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,3}(?::(?:(?:[0-9a-fA-F]){1,4})){1,4})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}(?::(?:(?:[0-9a-fA-F]){1,4})){1,3})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,5}(?::(?:(?:[0-9a-fA-F]){1,4})){1,2})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,6}:(?:(?:[0-9a-fA-F]){1,4}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,7}:)|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){7,7}(?:(?:[0-9a-fA-F]){1,4}))
    )""",
    flags=re.VERBOSE,
)

# Globals are slow, should be made local if possible when comparing performance
import os
from pathlib import Path

# Try to find MMDB files in common locations
script_dir = Path(__file__).parent
project_root = script_dir.parent

mmdb_paths = [
    Path("/usr/share/GeoIP"),
    project_root / "tests/maxmind",
]

if "GEOIP_MMDB_DIR" in os.environ:
    mmdb_paths.insert(0, Path(os.environ["GEOIP_MMDB_DIR"]))

mmdb_dir = None
for path in mmdb_paths:
    if (path / "GeoLite2-City.mmdb").exists() and (path / "GeoLite2-ASN.mmdb").exists():
        mmdb_dir = path
        break

if mmdb_dir is None:
    raise FileNotFoundError(
        "Could not find GeoLite2 MMDB files. Set GEOIP_MMDB_DIR or place files in standard locations."
    )

citydb = Reader(str(mmdb_dir / "GeoLite2-City.mmdb"), mode=MODE_MMAP_EXT)
asndb = Reader(str(mmdb_dir / "GeoLite2-ASN.mmdb"), mode=MODE_MMAP_EXT)


@lru_cache()
def iplookup(ip: str) -> str:
    try:
        cityrecord = citydb.city(ip)
    except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
        # no match; do nothing
        return ip

    try:
        asnrecord = asndb.asn(ip)
    except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
        # also err; do nothing
        return ip

    asnnum = asnrecord.autonomous_system_number
    asnorg = asnrecord.autonomous_system_organization
    isocode = cityrecord.country.iso_code

    return f"""<{ip}|AS{asnnum}_{asnorg}|{isocode}>""".replace(" ", "_")


# Could be a lambda instead
def ipenrich(matchobj: re.Match[str]) -> str:
    return iplookup(matchobj.group(0))


def main():
    for line in sys.stdin:
        enriched = IPRE.sub(ipenrich, line)
        print(enriched, end="")


if __name__ == "__main__":
    sys.exit(main())