# /// script
# requires-python = ">=3.11"
# dependencies = [
# "geoip2>=5.2.0",
# "maxminddb>=3.0.0",
# ]
# ///
import re
import sys
from functools import lru_cache
from geoip2.database import Reader
from geoip2.errors import AddressNotFoundError
from maxminddb import MODE_MMAP_EXT, InvalidDatabaseError
# ipv4 - copied from cyberchef.org minus the cidr mask
# ipv6 - https://gist.github.com/dfee/6ed3a4b05cfe7a6faf40a2102408d5d8
IPRE = re.compile(
r"""
(
(?:(?:\d|[01]?\d\d|2[0-4]\d|25[0-5])\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d|\d)
)
|
(
(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}:[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:::(?:ffff(?::0{1,4}){0,1}:){0,1}[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:fe80:(?::(?:(?:[0-9a-fA-F]){1,4})){0,4}%[0-9a-zA-Z]{1,})|(?::(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,7}|:))|(?:(?:(?:[0-9a-fA-F]){1,4}):(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,6}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,2}(?::(?:(?:[0-9a-fA-F]){1,4})){1,5})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,3}(?::(?:(?:[0-9a-fA-F]){1,4})){1,4})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}(?::(?:(?:[0-9a-fA-F]){1,4})){1,3})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,5}(?::(?:(?:[0-9a-fA-F]){1,4})){1,2})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,6}:(?:(?:[0-9a-fA-F]){1,4}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,7}:)|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){7,7}(?:(?:[0-9a-fA-F]){1,4}))
)""",
flags=re.VERBOSE,
)
# Globals are slow, should be made local if possible when comparing performance
import os
from pathlib import Path
# Try to find MMDB files in common locations
script_dir = Path(__file__).parent
project_root = script_dir.parent
mmdb_paths = [
Path("/usr/share/GeoIP"),
project_root / "tests/maxmind",
]
if "GEOIP_MMDB_DIR" in os.environ:
mmdb_paths.insert(0, Path(os.environ["GEOIP_MMDB_DIR"]))
mmdb_dir = None
for path in mmdb_paths:
if (path / "GeoLite2-City.mmdb").exists() and (path / "GeoLite2-ASN.mmdb").exists():
mmdb_dir = path
break
if mmdb_dir is None:
raise FileNotFoundError(
"Could not find GeoLite2 MMDB files. Set GEOIP_MMDB_DIR or place files in standard locations."
)
citydb = Reader(str(mmdb_dir / "GeoLite2-City.mmdb"), mode=MODE_MMAP_EXT)
asndb = Reader(str(mmdb_dir / "GeoLite2-ASN.mmdb"), mode=MODE_MMAP_EXT)
@lru_cache()
def iplookup(ip: str) -> str:
try:
cityrecord = citydb.city(ip)
except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
# no match; do nothing
return ip
try:
asnrecord = asndb.asn(ip)
except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
# also err; do nothing
return ip
asnnum = asnrecord.autonomous_system_number
asnorg = asnrecord.autonomous_system_organization
isocode = cityrecord.country.iso_code
return f"""<{ip}|AS{asnnum}_{asnorg}|{isocode}>""".replace(" ", "_")
# Could be a lambda instead
def ipenrich(matchobj: re.Match[str]) -> str:
return iplookup(matchobj.group(0))
def main():
for line in sys.stdin:
enriched = IPRE.sub(ipenrich, line)
print(enriched, end="")
if __name__ == "__main__":
sys.exit(main())