import ipaddress
from abc import ABC, abstractmethod
from ctypes import *
from ctypes.util import find_library
from typing import Iterable, List, Tuple
MAX_PACKET_SIZE = 2048
_libc = CDLL(find_library("c"))
PAN_LIBRARY_NAME = "pan"
_libpan_path = find_library(PAN_LIBRARY_NAME)
if _libpan_path is None:
raise RuntimeError("Can't find library " + PAN_LIBRARY_NAME)
_libpan = CDLL(_libpan_path)
_ERR_OK = 0
_ERR_FAILED = 1
_ERR_DEADLINE = 2
_ERR_NO_PATH = 3
_ERR_ADDR_SYNTAX = 4
_ERR_ADDR_RESOLUTION = 5
class PanError(Exception):
pass
class OperationFailed(PanError):
pass
class DeadlineExceeded(PanError):
pass
class NoPath(PanError):
pass
class InvalidAddrSyntax(PanError):
pass
class AddrResolutionFailed(PanError):
pass
def _raise_if_error(err):
if err == _ERR_FAILED:
raise OperationFailed("Requested operation failed")
elif err == _ERR_DEADLINE:
raise DeadlineExceeded("I/O deadline exceeded")
elif err == _ERR_NO_PATH:
raise NoPath("No path to destination is known")
elif err == _ERR_ADDR_SYNTAX:
raise InvalidAddrSyntax("Invalid address syntax")
elif err == _ERR_ADDR_RESOLUTION:
raise AddrResolutionFailed("Address resolution failed")
_PolicyFilterFn = CFUNCTYPE(c_void_p, POINTER(c_void_p), c_size_t, c_void_p)
class _PathSelectorCB(Structure):
PathFn = CFUNCTYPE(c_void_p, c_void_p)
InitializeFn = CFUNCTYPE(None, c_void_p, c_void_p, POINTER(c_void_p), c_size_t, c_void_p)
RefreshFn = CFUNCTYPE(None, POINTER(c_void_p), c_size_t, c_void_p)
PathDownFn = CFUNCTYPE(None, c_void_p, c_void_p, c_void_p)
CloseFn = CFUNCTYPE(None, c_void_p)
_fields_ = [
("path", PathFn),
("initialize", InitializeFn),
("refresh", RefreshFn),
("path_down", PathDownFn),
("close", CloseFn)
]
class _ReplySelectorCB(Structure):
PathFn = CFUNCTYPE(c_void_p, c_void_p, c_void_p)
InitializeFn = CFUNCTYPE(None, c_void_p, c_void_p)
RecordFn = CFUNCTYPE(None, c_void_p, c_void_p, c_void_p)
PathDownFn = CFUNCTYPE(None, c_void_p, c_void_p, c_void_p)
CloseFn = CFUNCTYPE(None, c_void_p)
_fields_ = [
("path", PathFn),
("initialize", InitializeFn),
("record", RecordFn),
("path_down", PathDownFn),
("close", CloseFn),
]
_free = _libc.free
_free.argtypes = [c_void_p]
_free.restype = None
_duplicate_handle = _libpan.PanDuplicateHandle
_duplicate_handle.argtypes = [c_void_p]
_duplicate_handle.restype = c_void_p
_delete_handle = _libpan.PanDeleteHandle
_delete_handle.argtypes = [c_void_p]
_delete_handle.restype = None
_udp_addr_new = _libpan.PanUDPAddrNew
_udp_addr_new.argtypes = [POINTER(c_uint64), POINTER(c_ubyte), c_int, c_uint16]
_udp_addr_new.restype = c_void_p
_udp_addr_get_ia = _libpan.PanUDPAddrGetIA
_udp_addr_get_ia.argtypes = [c_void_p, POINTER(c_uint64)]
_udp_addr_get_ia.restype = None
_udp_addr_is_ipv6 = _libpan.PanUDPAddrIsIPv6
_udp_addr_is_ipv6.argtypes = [c_void_p]
_udp_addr_is_ipv6.restype = c_int
_udp_addr_get_ipv4 = _libpan.PanUDPAddrGetIPv4
_udp_addr_get_ipv4.argtypes = [c_void_p, 4*c_ubyte]
_udp_addr_get_ipv4.restype = c_uint32
_udp_addr_get_ipv6 = _libpan.PanUDPAddrGetIPv6
_udp_addr_get_ipv6.argtypes = [c_void_p, 16*c_ubyte]
_udp_addr_get_ipv6.restype = c_uint32
_udp_addr_get_port = _libpan.PanUDPAddrGetPort
_udp_addr_get_port.argtypes = [c_void_p]
_udp_addr_get_port.restype = c_uint16
_udp_addr_to_string = _libpan.PanUDPAddrToString
_udp_addr_to_string.argtypes = [c_void_p]
_udp_addr_to_string.restype = c_void_p
_path_to_string = _libpan.PanPathToString
_path_to_string.argtypes = [c_void_p]
_path_to_string.restype = c_void_p
_path_get_fingerprint = _libpan.PanPathGetFingerprint
_path_get_fingerprint.argtypes = [c_void_p]
_path_get_fingerprint.restype = c_void_p
_path_contains_interface = _libpan.PanPathContainsInterface
_path_contains_interface.argtypes = [c_void_p]
_path_contains_interface.restype = c_int
_path_fingerprint_are_equal = _libpan.PanPathFingerprintAreEqual
_path_fingerprint_are_equal.argtypes = [c_void_p, c_void_p]
_path_fingerprint_are_equal.restype = c_int
_new_c_policy = _libpan.PanNewCPolicy
_new_c_policy.argtypes = [_PolicyFilterFn, c_void_p]
_new_c_policy.restype = c_void_p
_new_c_selector = _libpan.PanNewCSelector
_new_c_selector.argtypes = [POINTER(_PathSelectorCB), c_void_p]
_new_c_selector.restype = c_void_p
_new_c_reply_selector = _libpan.PanNewCReplySelector
_new_c_reply_selector.argtypes = [POINTER(_ReplySelectorCB), c_void_p]
_new_c_reply_selector.restype = c_void_p
_path_contains_interface.argtypes = [c_void_p]
_path_contains_interface.restype = c_int
class OwningHandle:
def __init__(self, handle: int|c_void_p):
if isinstance(handle, int):
self._handle = handle
elif isinstance(handle, c_void_p):
self._handle = handle.value
else:
assert False
def __del__(self):
self.delete()
@property
def _as_parameter_(self) -> int:
assert self._handle is not None
return self._handle
@property
def handle(self) -> int:
return self._handle
def delete(self):
if self._handle is not None:
_delete_handle(self._handle)
self._handle = None
def copy(self) -> 'OwningHandle':
return OwningHandle(_duplicate_handle(self._handle))
class Handle:
def __init__(self, handle: int|c_void_p):
if isinstance(handle, int):
self._handle = handle
elif isinstance(handle, c_void_p):
self._handle = handle.value
else:
assert False
@property
def _as_parameter_(self) -> c_void_p:
assert self._handle is not None
return self._handle
@property
def handle(self) -> int:
return self._handle
def delete(self):
self._handle = None
def copy(self) -> OwningHandle:
return OwningHandle(_duplicate_handle(self._handle))
class UDPAddress:
def __init__(self, handle: Handle|OwningHandle):
assert isinstance(handle, (Handle, OwningHandle))
self._handle = handle
@property
def _as_parameter_(self) -> int:
return self._handle.handle
@staticmethod
def build(isd: int, asn: int,
ip: ipaddress.IPv4Address | ipaddress.IPv6Address,
port: int) -> 'UDPAddress':
ia = c_uint64(int.from_bytes((asn | (isd << 48)).to_bytes(8, 'big'), 'little'))
buf = (16*c_ubyte)(*ip.packed)
ip_len = 4 if ip.version == 4 else 16
new_addr = _udp_addr_new(byref(ia), buf, ip_len, port)
return UDPAddress(OwningHandle(new_addr))
def __str__(self):
ptr = _udp_addr_to_string(self._handle)
if not ptr:
raise PanError("Converting UDPAddress to string failed")
result = string_at(ptr)
_free(ptr)
return result.decode()
def get_ia(self) -> Tuple[int, int]:
ia = c_uint64()
_udp_addr_get_ia(self._handle, byref(ia))
buf = bytes(ia)
isd = int.from_bytes(buf[:2], byteorder='big')
asn = int.from_bytes(buf[2:], byteorder='big')
return (isd, asn)
def get_ip(self) -> ipaddress.IPv4Address | ipaddress.IPv6Address:
if _udp_addr_is_ipv6(self._handle) == 0:
addr = (4*c_ubyte)()
err = _udp_addr_get_ipv4(self._handle, addr)
_raise_if_error(err)
return ipaddress.ip_address(int.from_bytes(bytes(addr), 'big'))
else:
addr = (16*c_ubyte)()
err = _udp_addr_get_ipv6(self._handle, addr)
_raise_if_error(err)
return ipaddress.ip_address(int.from_bytes(bytes(addr), 'big'))
def get_port(self) -> int:
return int(_udp_addr_get_port(self._handle))
class PathInterface:
def __init__(self, handle: Handle|OwningHandle):
assert isinstance(handle, (Handle, OwningHandle))
self._handle = handle
@property
def _as_parameter_(self) -> int:
return self._handle.handle
class PathFingerprint:
def __init__(self, handle: Handle|OwningHandle):
assert isinstance(handle, (Handle, OwningHandle))
self._handle = handle
@property
def _as_parameter_(self) -> int:
return self._handle.handle
def __eq__(self, other):
if not isinstance(other, PathFingerprint):
return False
if _path_fingerprint_are_equal(self._handle, other._handle) != 0:
return True
else:
return False
def __nq__(self, other):
return not (self == other)
class Path:
def __init__(self, handle: Handle|OwningHandle):
assert isinstance(handle, (Handle, OwningHandle))
self._handle = handle
@property
def _as_parameter_(self) -> int:
return self._handle.handle
def __str__(self):
ptr = _path_to_string(self._handle)
if not ptr:
raise PanError("Converting Path to string failed")
result = string_at(ptr)
_free(ptr)
return result.decode()
def __contains__(self, interface):
if not isinstance(interface, PathInterface):
return False
if _path_contains_interface(self._handle, interface._handle) != 0:
return True
else:
return False
def fingerprint(self) -> PathFingerprint:
return PathFingerprint(OwningHandle(_path_get_fingerprint(self._handle)))
class PathPolicy(ABC):
def __init__(self):
def _filter(paths, count, user):
pypaths = [Path(Handle(path)) for path in paths[:count]]
filtered = self.filter(pypaths)
try:
assert len(filtered) <= count
for i, path in enumerate(filtered):
assert isinstance(path, Path)
handle = path._handle.handle
assert isinstance(handle, int)
paths[i] = handle
return len(filtered)
except AssertionError:
return
self._callback = _PolicyFilterFn(_filter)
self._handle = OwningHandle(_new_c_policy(self._callback, None))
@property
def _as_parameter_(self) -> int:
return self._handle.handle
def delete(self):
self._handle.delete()
@abstractmethod
def filter(self, paths: List[Path]) -> Iterable[Path]:
pass
class PathSelector(ABC):
def __init__(self):
def _path(user):
result = self.path()
if isinstance(result, Path):
handle = result._handle.handle
if isinstance(handle, int):
return handle
return 0
def _initialize(local, remote, paths, count, user):
self.initialize(
UDPAddress(OwningHandle(local)),
UDPAddress(OwningHandle(remote)),
[Path(OwningHandle(path)) for path in paths[:count]]
)
def _refresh(paths, count, user):
self.refresh([Path(OwningHandle(path)) for path in paths[:count]])
def _path_down(pf, pi, user):
self.path_down(
PathFingerprint(OwningHandle(pf)),
PathInterface(OwningHandle(pi)),
)
self._callbacks = _PathSelectorCB(
_PathSelectorCB.PathFn(_path),
_PathSelectorCB.InitializeFn(_initialize),
_PathSelectorCB.RefreshFn(_refresh),
_PathSelectorCB.PathDownFn(_path_down),
_PathSelectorCB.CloseFn(lambda _: self.close())
)
self._handle = OwningHandle(_new_c_selector(byref(self._callbacks), None))
@property
def _as_parameter_(self) -> int:
return self._handle.handle
def delete(self):
self._handle.delete()
@abstractmethod
def path(self) -> Path:
pass
@abstractmethod
def initialize(self, local: UDPAddress, remote: UDPAddress, paths: List[Path]) -> None:
pass
@abstractmethod
def refresh(self, paths: List[Path]) -> None:
pass
@abstractmethod
def path_down(self, pf: PathFingerprint, pi: PathInterface) -> None:
pass
@abstractmethod
def close(self) -> None:
pass
class ReplySelector(ABC):
def __init__(self):
def _path(remote, user):
result = self.path(UDPAddress(OwningHandle(remote)))
if isinstance(result, Path):
handle = result._handle.handle
if isinstance(handle, int):
return handle
return 0
def _initialize(local, user):
self.initialize(UDPAddress(OwningHandle(local)))
def _record(remote, path, user):
self.record(
UDPAddress(OwningHandle(remote)),
Path(OwningHandle(path))
)
def _path_down(pf, pi, user):
self.path_down(
PathFingerprint(OwningHandle(pf)),
PathInterface(OwningHandle(pi)),
)
self._callbacks = _ReplySelectorCB(
_ReplySelectorCB.PathFn(_path),
_ReplySelectorCB.InitializeFn(_initialize),
_ReplySelectorCB.RecordFn(_record),
_ReplySelectorCB.PathDownFn(_path_down),
_ReplySelectorCB.CloseFn(lambda _: self.close())
)
self._handle = OwningHandle(_new_c_reply_selector(byref(self._callbacks), None))
@property
def _as_parameter_(self) -> int:
return self._handle.handle
def delete(self):
self._handle.delete()
@abstractmethod
def path(self, remote: UDPAddress) -> Path:
pass
@abstractmethod
def initialize(self, local: UDPAddress) -> None:
pass
@abstractmethod
def record(self, remote: UDPAddress, path: Path) -> None:
pass
@abstractmethod
def path_down(self, pf: PathFingerprint, pi: PathInterface) -> None:
pass
@abstractmethod
def close(self) -> None:
pass