from __future__ import annotations
import hashlib
import logging
import os
import sys
from pathlib import Path
from typing import Optional
from agent_vault.crypto import decrypt_secret, load_identity, load_identity_from_str
from agent_vault.errors import (
NotAuthorizedError,
SecretNotFoundError,
VaultNotFoundError,
)
from agent_vault.manifest import Manifest
from agent_vault.metadata import SecretMetadata
logger = logging.getLogger("agent_vault")
def _resolve_repo_path(repo_path: str | Path) -> Path:
path_str = str(repo_path)
if not any(
path_str.startswith(prefix)
for prefix in ("https://", "git@", "ssh://", "git://")
):
return Path(repo_path).expanduser().resolve()
url_hash = hashlib.sha256(path_str.encode()).hexdigest()[:16]
cache_dir = Path.home() / ".agent-vault" / "cache" / url_hash
try:
import git as gitmodule
if cache_dir.exists() and (cache_dir / ".git").exists():
try:
repo = gitmodule.Repo(str(cache_dir))
if repo.remotes:
repo.remotes[0].pull(rebase=False)
except Exception as e:
logger.warning("git pull failed for cached repo: %s", e)
print(
f"Warning: git pull failed for cached repo: {e}",
file=sys.stderr,
)
else:
cache_dir.parent.mkdir(parents=True, exist_ok=True)
gitmodule.Repo.clone_from(path_str, str(cache_dir))
except Exception as e:
raise VaultNotFoundError(f"Failed to clone/update {path_str}: {e}") from e
return cache_dir
class Vault:
def __init__(
self,
repo_path: str | Path,
key_path: Optional[str | Path] = None,
key_str: Optional[str] = None,
auto_pull: bool = True,
):
self._repo_path = _resolve_repo_path(repo_path)
self._vault_dir = self._repo_path / ".agent-vault"
self._auto_pull = auto_pull
if not self._vault_dir.is_dir():
raise VaultNotFoundError(
f"No vault found at {self._repo_path}. "
"Run 'agent-vault init' first."
)
if key_str is not None:
self._identity = load_identity_from_str(key_str)
elif key_path is not None:
self._identity = load_identity(str(Path(key_path).expanduser()))
elif os.environ.get("AGENT_VAULT_KEY"):
self._identity = load_identity_from_str(os.environ["AGENT_VAULT_KEY"])
else:
default_key = Path.home() / ".agent-vault" / "owner.key"
if default_key.exists():
self._identity = load_identity(str(default_key))
else:
raise VaultNotFoundError(
"No key provided. Pass key_path=, key_str=, "
"set AGENT_VAULT_KEY env var, or ensure "
"~/.agent-vault/owner.key exists."
)
self._manifest = Manifest.load(self._vault_dir / "manifest.yaml")
def pull(self) -> None:
try:
import git
repo = git.Repo(str(self._repo_path))
if repo.remotes:
origin = repo.remotes[0]
origin.pull(rebase=False)
except Exception as e:
logger.warning("git pull failed (continuing with local state): %s", e)
print(f"Warning: git pull failed: {e}", file=sys.stderr)
def get(self, secret_path: str) -> str:
if self._auto_pull:
self.pull()
enc_path = self._vault_dir / "secrets" / _to_file_path(secret_path, ".enc")
if not enc_path.exists():
raise SecretNotFoundError(f"Secret not found: {secret_path}")
ciphertext = enc_path.read_bytes()
try:
return decrypt_secret(ciphertext, self._identity)
except Exception as e:
raise NotAuthorizedError(
f"Cannot decrypt '{secret_path}': {e}"
) from e
def list_secrets(self, group: Optional[str] = None) -> list[SecretMetadata]:
secrets_dir = self._vault_dir / "secrets"
if not secrets_dir.exists():
return []
results = []
for meta_path in sorted(secrets_dir.rglob("*.meta")):
try:
meta = SecretMetadata.load(meta_path)
if group is None or meta.group == group:
results.append(meta)
except Exception:
continue
return results
def list_agents(self) -> list[dict]:
return self._manifest.list_agents()
@property
def manifest(self) -> Manifest:
return self._manifest
def reload(self) -> None:
self._manifest = Manifest.load(self._vault_dir / "manifest.yaml")
def __enter__(self) -> "Vault":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
return False
def _to_file_path(secret_path: str, suffix: str) -> Path:
parts = secret_path.split("/")
if len(parts) < 2:
return Path(parts[0] + suffix)
return Path(*parts[:-1]) / (parts[-1] + suffix)