import socket
import sys
def test_cache_operations(host='127.0.0.1', port=11211):
def send_command(sock, command, expect_data=False):
try:
sock.sendall(command.encode() + b'\r\n')
response = b''
if expect_data:
while True:
chunk = sock.recv(1024)
if not chunk:
break
response += chunk
if b'END\r\n' in response:
break
else:
response = sock.recv(1024)
return response.decode().strip()
except Exception as e:
print(f"命令执行失败: {e}")
return ""
print("测试缓存功能...")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
print("连接服务器成功")
set_cmd = "set test_key 0 60 11\r\nhello world"
response = send_command(sock, set_cmd)
if response == "STORED":
print("SET命令成功")
else:
print(f"SET命令失败: {response}")
return False
get_cmd = "get test_key"
response = send_command(sock, get_cmd, expect_data=True)
if "hello world" in response and "VALUE" in response:
print("GET命令成功")
print(f" 获取的值: hello world")
else:
print(f"GET命令失败: {response}")
return False
delete_cmd = "delete test_key"
response = send_command(sock, delete_cmd)
if response == "DELETED":
print("DELETE命令成功")
else:
print(f"DELETE命令失败: {response}")
return False
response = send_command(sock, get_cmd, expect_data=True)
if "END" in response and "VALUE" not in response:
print("删除验证成功")
else:
print(f"删除验证失败: {response}")
return False
sock.close()
return True
except Exception as e:
print(f"测试过程中出错: {e}")
return False
def main():
print("缓存功能测试")
print("=" * 40)
success = test_cache_operations()
if success:
print("\n缓存功能测试通过!")
return 0
else:
print("\n缓存功能测试失败")
return 1
if __name__ == "__main__":
sys.exit(main())