import json
import os
import socket
from pathlib import Path
from typing import Dict, Optional
def get_socket_path() -> Path:
config_dir = Path.home() / ".config" / ".lazy-locker"
return config_dir / "agent.sock"
def _send_request(request: dict) -> dict:
socket_path = get_socket_path()
if not socket_path.exists():
raise ConnectionError(
"Agent lazy-locker non démarré. "
"Lancez 'lazy-locker' et entrez votre passphrase."
)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(str(socket_path))
sock.sendall((json.dumps(request) + "\n").encode())
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if b"\n" in response:
break
return json.loads(response.decode().strip())
finally:
sock.close()
def is_agent_running() -> bool:
try:
response = _send_request({"action": "ping"})
return response.get("status") == "ok"
except (ConnectionError, FileNotFoundError, ConnectionRefusedError):
return False
def get_secrets() -> Dict[str, str]:
response = _send_request({"action": "get_secrets"})
if response.get("status") == "ok":
return response.get("data", {})
else:
raise RuntimeError(response.get("message", "Erreur inconnue"))
def get_secret(name: str) -> Optional[str]:
response = _send_request({"action": "get_secret", "name": name})
if response.get("status") == "ok":
return response.get("data", {}).get("value")
else:
return None
def inject_secrets(prefix: str = "", override: bool = True) -> int:
secrets = get_secrets()
count = 0
for name, value in secrets.items():
env_name = f"{prefix}{name}" if prefix else name
if override or env_name not in os.environ:
os.environ[env_name] = value
count += 1
return count
def status() -> dict:
response = _send_request({"action": "ping"})
if response.get("status") == "ok":
return response.get("data", {})
else:
raise RuntimeError(response.get("message", "Agent non disponible"))
load_secrets = inject_secrets