import os
import sys
import copy
import json
import time
import shlex
import errno
import timeit
import socket
import psutil
import hashlib
import threading
import subprocess
import requests
import traceback
from queue import Queue
from datetime import datetime
from requests.auth import HTTPBasicAuth
from optparse import OptionParser
CU = "cu.tz.cloudcpp.com:80"
CT = "ct.tz.cloudcpp.com:80"
CM = "cm.tz.cloudcpp.com:80"
IFACE_IGNORE_LIST = ["lo", "tun", "docker",
"vnet", "veth", "vmbr", "kube", "br-"]
PROBE_PROTOCOL_PREFER = 'ipv4'
PING_PACKET_HISTORY_LEN = 100
INTERVAL = 1
G_IP_INFO = None
G_SYS_INFO = None
def get_uptime():
return int(time.time() - psutil.boot_time())
def get_memory():
vm = psutil.virtual_memory()
sm = psutil.swap_memory()
return int(vm.total / 1024.0), int(vm.used / 1024.0), int(sm.total / 1024.0), int(sm.used / 1024.0)
def get_hdd():
valid_fs = set(["ext4", "ext3", "ext2", "reiserfs", "jfs", "btrfs",
"fuseblk", "zfs", "simfs", "ntfs", "fat32", "exfat", "xfs"])
disks, size, used = dict(), 0, 0
for disk in psutil.disk_partitions():
if disk.device not in disks and disk.fstype.lower() in valid_fs:
disks[disk.device] = disk.mountpoint
for disk in disks.values():
usage = psutil.disk_usage(disk)
size += usage.total
used += usage.used
return int(size / 1024.0 / 1024.0), int(used / 1024.0 / 1024.0)
def get_cpu(options):
return psutil.cpu_percent(interval=int(options.interval))
def get_sys_traffic(options):
net_in, net_out = 0, 0
net = psutil.net_io_counters(pernic=True)
for k, v in net.items():
if skip_iface(k, options):
continue
else:
net_in += v[1]
net_out += v[0]
return net_in, net_out
def get_vnstat_traffic(options):
now = datetime.now()
vnstat_res = subprocess.check_output(
"/usr/bin/vnstat --json m", shell=True)
json_dict = json.loads(vnstat_res)
network_in, network_out, m_network_in, m_network_out = (0, 0, 0, 0)
json_version = json_dict.get("jsonversion", "2")
for iface in json_dict.get("interfaces", []):
name, bandwidth_factor, mouth_field = "invalid", 1, "month"
if json_version == "1":
name = iface["id"]
bandwidth_factor = 1024
mouth_field = "months"
else:
name = iface["name"]
if skip_iface(name, options):
continue
traffic = iface["traffic"]
network_out += traffic["total"]["tx"] * bandwidth_factor
network_in += traffic["total"]["rx"] * bandwidth_factor
for month in traffic.get(mouth_field, []):
if now.year != month["date"]["year"] or month["date"]["month"] != now.month:
continue
m_network_out += month["tx"] * bandwidth_factor
m_network_in += month["rx"] * bandwidth_factor
return (network_in, network_out, m_network_in, m_network_out)
def tupd():
t, u, p, d = 0, 0, 0, 0
tcp_set, udp_set = set(), set()
try:
t = len(psutil.net_connections("tcp"))
u = len(psutil.net_connections("udp"))
for proc in psutil.process_iter():
p += 1
try:
d += proc.num_threads()
except:
pass
return t, u, p, d
except Exception as ex:
traceback.print_exc()
return 0, 0, 0, 0
def get_network(ip_version):
if (ip_version == 4):
domain = "ipv4.google.com"
elif (ip_version == 6):
domain = "ipv6.google.com"
try:
socket.create_connection((domain, 80), 2).close()
return True
except:
return False
G_LOST_RATE = {
'10010': 0.0,
'189': 0.0,
'10086': 0.0
}
G_PING_TIME = {
'10010': 0,
'189': 0,
'10086': 0
}
G_NET_SPEED = {
'netrx': 0.0,
'nettx': 0.0,
'clock': 0.0,
'diff': 0.0,
'avgrx': 0,
'avgtx': 0
}
def _ping_thread(target, mark):
packet_lost = 0
packet_queue = Queue(maxsize=PING_PACKET_HISTORY_LEN)
host, port = target.split(":")
ip = host
if host.count(':') < 1: try:
if PROBE_PROTOCOL_PREFER == 'ipv4':
ip = socket.getaddrinfo(host, None, socket.AF_INET)[0][4][0]
else:
ip = socket.getaddrinfo(host, None, socket.AF_INET6)[0][4][0]
except Exception as ex:
traceback.print_exc()
while True:
if packet_queue.full():
if packet_queue.get() == 0:
packet_lost -= 1
try:
b = timeit.default_timer()
socket.create_connection((ip, port), timeout=1).close()
G_PING_TIME[mark] = int((timeit.default_timer() - b) * 1000)
packet_queue.put(1)
except socket.error as error:
if error.errno == errno.ECONNREFUSED:
G_PING_TIME[mark] = int((timeit.default_timer() - b) * 1000)
packet_queue.put(1)
else:
packet_lost += 1
packet_queue.put(0)
if packet_queue.qsize() > 30:
G_LOST_RATE[mark] = float(packet_lost) / packet_queue.qsize()
time.sleep(INTERVAL)
def _net_speed(options):
while True:
avgrx, avgtx = 0, 0
for name, stats in psutil.net_io_counters(pernic=True).items():
if skip_iface(name, options):
continue
avgrx += stats.bytes_recv
avgtx += stats.bytes_sent
now_clock = time.time()
G_NET_SPEED["diff"] = now_clock - G_NET_SPEED["clock"]
G_NET_SPEED["clock"] = now_clock
G_NET_SPEED["netrx"] = int(
(avgrx - G_NET_SPEED["avgrx"]) / G_NET_SPEED["diff"])
G_NET_SPEED["nettx"] = int(
(avgtx - G_NET_SPEED["avgtx"]) / G_NET_SPEED["diff"])
G_NET_SPEED["avgrx"] = avgrx
G_NET_SPEED["avgtx"] = avgtx
time.sleep(INTERVAL)
def start_rt_collect_t(options):
t_list = []
if not options.disable_ping:
t_list.append(threading.Thread(
target=_ping_thread,
kwargs={
'target': options.cu,
'mark': '10010',
}
))
t_list.append(threading.Thread(
target=_ping_thread,
kwargs={
'target': options.ct,
'mark': '189',
}
))
t_list.append(threading.Thread(
target=_ping_thread,
kwargs={
'target': options.cm,
'mark': '10086',
}
))
if not options.disable_extra:
t_list.append(threading.Thread(
target=refresh_ip_info,
))
t_list.append(threading.Thread(
target=_net_speed,
kwargs={
'options': options,
}
))
for t in t_list:
t.setDaemon(True)
t.start()
def byte_str(object):
if isinstance(object, str):
return object.encode(encoding="utf-8")
elif isinstance(object, bytes):
return bytes.decode(object)
else:
print(type(object))
def sample(options, stat_base):
cpu_percent = int(get_cpu(options))
uptime = get_uptime()
load_1, load_5, load_15 = os.getloadavg(
) if 'linux' in sys.platform else (0.0, 0.0, 0.0)
memory_total, memory_used, swap_total, swap_used = get_memory()
hdd_total, hdd_used = get_hdd()
stat_data = copy.deepcopy(stat_base)
stat_data['uptime'] = uptime
stat_data['load_1'] = load_1
stat_data['load_5'] = load_5
stat_data['load_15'] = load_15
stat_data['memory_total'] = memory_total
stat_data['memory_used'] = memory_used
stat_data['swap_total'] = swap_total
stat_data['swap_used'] = swap_used
stat_data['hdd_total'] = hdd_total
stat_data['hdd_used'] = hdd_used
stat_data['cpu'] = cpu_percent
stat_data['network_rx'] = G_NET_SPEED.get("netrx")
stat_data['network_tx'] = G_NET_SPEED.get("nettx")
stat_data['ping_10010'] = int(G_LOST_RATE.get('10010') * 100)
stat_data['ping_189'] = int(G_LOST_RATE.get('189') * 100)
stat_data['ping_10086'] = int(G_LOST_RATE.get('10086') * 100)
stat_data['time_10010'] = G_PING_TIME.get('10010')
stat_data['time_189'] = G_PING_TIME.get('189')
stat_data['time_10086'] = G_PING_TIME.get('10086')
if options.vnstat:
(network_in, network_out, m_network_in,
m_network_out) = get_vnstat_traffic(options)
stat_data['network_in'] = network_in
stat_data['network_out'] = network_out
stat_data['last_network_in'] = network_in - m_network_in
stat_data['last_network_out'] = network_out - m_network_out
else:
net_in, net_out = get_sys_traffic(options)
stat_data['network_in'] = net_in
stat_data['network_out'] = net_out
if options.disable_tupd:
stat_data['tcp'], stat_data['udp'], stat_data['process'], stat_data['thread'] = 0, 0, 0, 0
else:
stat_data['tcp'], stat_data['udp'], stat_data['process'], stat_data['thread'] = tupd()
if not options.disable_extra:
if G_IP_INFO:
stat_data['ip_info'] = G_IP_INFO
if G_SYS_INFO:
stat_data['sys_info'] = G_SYS_INFO
return stat_data
def get_target_network(url):
ipv4, ipv6 = False, False
arr = url.split("/")
proto = arr[0].replace(":", "")
t = arr[2].split(":")
if len(t) == 2:
host, port = t
else:
host = t[0]
port = 443 if proto == "https" else 80
for response in socket.getaddrinfo(host, port):
family, _, _, _, sockaddr = response
print(family, sockaddr)
if family == socket.AddressFamily.AF_INET:
ipv4 = True
elif family == socket.AddressFamily.AF_INET6:
ipv6 = True
return ipv4, ipv6
def http_report(options, stat_base):
socket.setdefaulttimeout(5)
start_rt_collect_t(options)
online4 = get_network(4)
online6 = get_network(6)
if not any([online4, online6]):
print("try get target network type {}".format(options.addr))
ipv4, ipv6 = get_target_network(options.addr)
online4 = ipv4
online6 = ipv6
stat_base['online4'] = online4
stat_base['online6'] = online6
ssr_auth = "single"
auth_user = options.username
if len(options.gid) > 0:
ssr_auth = "group"
auth_user = options.gid
http_headers = {"ssr-auth": ssr_auth}
auth = HTTPBasicAuth(auth_user, options.password)
print(http_headers, auth)
sess = requests.Session()
while True:
try:
stat_data = sample(options, stat_base)
print(json.dumps(stat_data))
r = sess.post(options.addr, auth=auth,
json=stat_data, headers=http_headers)
print(r)
except KeyboardInterrupt:
raise
except Exception as ex:
traceback.print_exc()
time.sleep(3)
sess = requests.Session()
IP_API_URL = "http://ip-api.com/json?fields=status,message,continent,continentCode,country,countryCode,region,regionName,city,district,zip,lat,lon,timezone,isp,org,as,asname,query&lang=zh-CN"
def refresh_ip_info():
while True:
try:
r = requests.get(IP_API_URL, timeout=5)
resp = r.json()
ip_info = {
"query": resp.get("query", "unknown"),
"source": "ip-api.com",
"continent": resp.get("continent", "unknown"),
"country": resp.get("country", "unknown"),
"region_name": resp.get("regionName", "unknown"),
"city": resp.get("city", "unknown"),
"isp": resp.get("isp", "unknown"),
"org": resp.get("org", "unknown"),
"as": resp.get("as", "unknown"),
"asname": resp.get("asname", "unknown"),
"lat": resp.get("lat", 0),
"lon": resp.get("lon", 0),
"timezone": resp.get("timezone", "Asia/Shanghai"),
}
global G_IP_INFO
G_IP_INFO = ip_info
except Exception as ex:
traceback.print_exc()
time.sleep(3600)
def get_sys_info(options):
import platform
from cpuinfo import get_cpu_info
cpu_info = get_cpu_info()
sys_info = {
"name": options.username,
"version": "stat_client.py",
"os_name": platform.system(),
"os_arch": platform.machine(),
"os_family": "unknown",
"os_release": platform.platform(),
"kernel_version": platform.release(),
"cpu_num": cpu_info.get("count", 0),
"cpu_brand": cpu_info.get("brand_raw", "unknown"),
"cpu_vender_id": cpu_info.get("vendor_id_raw", "unknown"),
"host_name": platform.node(),
}
return sys_info
def gen_sys_id(sys_info):
SYS_ID_FILE = ".server_status_sys_id"
if os.path.exists(SYS_ID_FILE):
with open(SYS_ID_FILE, 'r') as f:
sys_id = f.read()
print(f"read sys_id from {SYS_ID_FILE}")
return sys_id.strip()
s = "{}/{}/{}/{}/{}/{}/{}/{}".format(
sys_info.get("host_name", "unknown"),
sys_info.get("os_name", "unknown"),
sys_info.get("os_arch", "unknown"),
sys_info.get("os_family", "unknown"),
sys_info.get("os_release", "unknown"),
sys_info.get("kernel_version", "unknown"),
sys_info.get("cpu_brand", "unknown"),
psutil.boot_time(),
)
sys_id = hashlib.md5(s.encode("utf-8")).hexdigest()
with open(SYS_ID_FILE, "w") as wf:
wf.write(sys_id)
print(f"save sys_id to {SYS_ID_FILE} succ")
return sys_id
def skip_iface(name, options):
if len(options.iface) > 0:
if any([name == e for e in options.iface]):
return False
return True
if any([e in name for e in options.exclude_iface]):
return True
return False
def main():
usage = """usage: python3 %prog [options] arg
eg:
python3 %prog -a http://127.0.0.1:8080/report -u h1 -p p1
python3 %prog -a http://127.0.0.1:8080/report -u h1 -p p1 -n
"""
parser = OptionParser(usage)
parser.add_option("-a", "--addr", dest="addr", default="http://127.0.0.1:8080/report",
help="http/tcp addr [default: %default]")
parser.add_option("-u", "--user", dest="username",
default="h1", help="auth user [default: %default]")
parser.add_option("-g", "--gid", dest="gid",
default="", help="group id [default: %default]")
parser.add_option("--alias", dest="alias",
default="unknown", help="alias for host [default: %default]")
parser.add_option("-p", "--pass", dest="password",
default="p1", help="auth pass [default: %default]")
parser.add_option("-n", "--vnstat", default=False,
action="store_true", help="enable vnstat [default: %default]")
parser.add_option("--disable-extra", default=False,
action="store_true", help="disable extra info report [default: %default]")
parser.add_option("--disable-ping", default=False,
action="store_true", help="disable ping [default: %default]")
parser.add_option("--disable-tupd", default=False,
action="store_true", help="disable t/u/p/d [default: %default]")
parser.add_option("--cm", dest="cm", default=CM,
help="China Mobile probe addr [default: %default]")
parser.add_option("--ct", dest="ct", default=CT,
help="China Telecom probe addr [default: %default]")
parser.add_option("--cu", dest="cu", default=CU,
help="China Unicom probe addr [default: %default]")
parser.add_option("-w", "--weight", dest="weight",
default=0, help="weight for rank [default: %default]")
parser.add_option("--disable-notify", default=False,
action="store_true", help="disable notify [default: %default]")
parser.add_option("-t", "--type", dest="type",
default="", help="host type [default: %default]")
parser.add_option("--location", dest="location",
default="", help="location [default: %default]")
parser.add_option("-i", "--iface", dest="iface",
default="", help="iface list, eg: eth0,eth1 [default: %default]")
parser.add_option("-e", "--exclude-iface", dest="exclude_iface",
default="lo,docker,vnet,veth,vmbr,kube,br-",
help="exclude iface [default: %default]")
parser.add_option("--interval", dest="interval",
default=1, help="report interval [default: %default]")
(options, args) = parser.parse_args()
def parse_iface_list(ifaces): return list(
filter(lambda s: len(s), map(str.strip, ifaces.split(","))))
options.iface = parse_iface_list(options.iface)
options.exclude_iface = parse_iface_list(options.exclude_iface)
print(json.dumps(options.__dict__, indent=2))
if options.vnstat:
if sys.platform.startswith("win"):
raise RuntimeError("unsupported: enable vnstat on win os")
sys_info = get_sys_info(options)
sys_id = gen_sys_id(sys_info)
print("sys info: {}".format(json.dumps(sys_info, indent=2)))
print("sys id: {}".format(sys_id))
if not options.disable_extra:
global G_SYS_INFO
G_SYS_INFO = sys_info
stat_base = {}
stat_base["frame"] = "data"
stat_base['version'] = "py"
stat_base['gid'] = ""
stat_base['alias'] = ""
stat_base['name'] = options.username
stat_base['weight'] = options.weight
stat_base['vnstat'] = options.vnstat
stat_base['notify'] = True
if len(options.gid) > 0:
stat_base["gid"] = options.gid
if options.username == "h1":
stat_base['name'] = sys_id
if options.alias == "unknown":
stat_base['alias'] = stat_base['name']
else:
stat_base['alias'] = options.alias
if options.disable_notify:
stat_base['notify'] = False
if len(options.type) > 0:
stat_base['type'] = options.type
if len(options.location) > 0:
stat_base['location'] = options.location
print("stat_base: {}".format(json.dumps(stat_base, indent=2)))
if options.addr.startswith("http"):
http_report(options, stat_base)
elif options.addr.startswith("grpc"):
raise RuntimeError("grpc unsupported")
else:
print("invalid addr scheme")
if __name__ == '__main__':
main()