from maxminddb import MODE_MMAP_EXT, InvalidDatabaseError
from geoip2.errors import AddressNotFoundError
from geoip2.database import Reader
from functools import lru_cache
import sys
import re
# ipv4 - copied from cyberchef.org minus the cidr mask
# ipv6 - https://gist.github.com/dfee/6ed3a4b05cfe7a6faf40a2102408d5d8
IPRE = re.compile(
r"""
(
(?:(?:\d|[01]?\d\d|2[0-4]\d|25[0-5])\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d|\d)
)
|
(
(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}:[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:::(?:ffff(?::0{1,4}){0,1}:){0,1}[^\\s:](?:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))|(?:fe80:(?::(?:(?:[0-9a-fA-F]){1,4})){0,4}%[0-9a-zA-Z]{1,})|(?::(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,7}|:))|(?:(?:(?:[0-9a-fA-F]){1,4}):(?:(?::(?:(?:[0-9a-fA-F]){1,4})){1,6}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,2}(?::(?:(?:[0-9a-fA-F]){1,4})){1,5})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,3}(?::(?:(?:[0-9a-fA-F]){1,4})){1,4})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,4}(?::(?:(?:[0-9a-fA-F]){1,4})){1,3})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,5}(?::(?:(?:[0-9a-fA-F]){1,4})){1,2})|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,6}:(?:(?:[0-9a-fA-F]){1,4}))|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){1,7}:)|(?:(?:(?:(?:[0-9a-fA-F]){1,4}):){7,7}(?:(?:[0-9a-fA-F]){1,4}))
)""",
flags=re.VERBOSE
)
# Globals are slow, should be made local if possible when comparing performance
citydb = Reader("/usr/share/GeoIP/GeoLite2-City.mmdb", mode=MODE_MMAP_EXT)
asndb = Reader("/usr/share/GeoIP/GeoLite2-ASN.mmdb", mode=MODE_MMAP_EXT)
@lru_cache()
def iplookup(ip: str) -> str:
try:
cityrecord = citydb.city(ip)
except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
# no match; do nothing
return ip
try:
asnrecord = asndb.asn(ip)
except (TypeError, ValueError, AddressNotFoundError, InvalidDatabaseError):
# also err; do nothing
return ip
asnnum = asnrecord.autonomous_system_number
asnorg = asnrecord.autonomous_system_organization
isocode = cityrecord.country.iso_code
return f"""<{ip}|AS{asnnum}_{asnorg}|{isocode}>""".replace(" ", "_")
# Could be a lambda instead
def ipenrich(matchobj: re.Match[str]) -> str:
return iplookup(matchobj.group(0))
def main():
for line in sys.stdin:
enriched = IPRE.sub(ipenrich, line)
print(enriched, end="")
if __name__ == "__main__":
sys.exit(main())