#![cfg(feature = "python")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use zeroize::{Zeroize, Zeroizing};
use crate::error::{
MasterKeyError as RustMasterKeyError, PlaceholderError as RustPlaceholderError, SecretshError,
SpawnError, VaultError,
};
use crate::harden::harden_process;
use crate::redact::Redactor;
use crate::spawn::{spawn_child, SpawnConfig};
use crate::tokenizer::tokenize;
use crate::vault::{Vault, VaultConfig};
static HARDENED: AtomicBool = AtomicBool::new(false);
fn ensure_hardened() {
if !HARDENED.swap(true, Ordering::Relaxed) {
harden_process();
}
}
pyo3::create_exception!(
_native,
SecretSHError,
PyException,
"Base exception for all secretsh errors."
);
pyo3::create_exception!(
_native,
VaultNotFoundError,
SecretSHError,
"The vault file does not exist. Run `secretsh init` to create one."
);
pyo3::create_exception!(
_native,
VaultCorruptError,
SecretSHError,
"The vault file is corrupt or has been tampered with."
);
pyo3::create_exception!(
_native,
VaultPermissionError,
SecretSHError,
"The vault file has insecure permissions."
);
pyo3::create_exception!(
_native,
DecryptionError,
SecretSHError,
"Decryption failed — the master passphrase is incorrect or the vault is corrupt."
);
pyo3::create_exception!(
_native,
MasterKeyError,
SecretSHError,
"Master key error — the passphrase environment variable is not set or the passphrase is too short."
);
pyo3::create_exception!(
_native,
PlaceholderError,
SecretSHError,
"A placeholder could not be resolved against the vault."
);
pyo3::create_exception!(
_native,
TokenizationError,
SecretSHError,
"The command string was rejected by the tokenizer."
);
pyo3::create_exception!(
_native,
CommandError,
SecretSHError,
"The child process could not be spawned or was killed by a resource limit."
);
pyo3::create_exception!(
_native,
EntryLimitError,
SecretSHError,
"The vault entry limit (10,000) has been reached."
);
pyo3::create_exception!(
_native,
LockError,
SecretSHError,
"Could not acquire the vault lock."
);
fn to_py_err(e: SecretshError) -> PyErr {
let msg = e.to_string();
match &e {
SecretshError::Vault(ve) => match ve {
VaultError::NotFound { .. } => VaultNotFoundError::new_err(msg),
VaultError::HmacMismatch
| VaultError::CommitTagMismatch
| VaultError::GcmMismatch { .. }
| VaultError::AadMismatch { .. }
| VaultError::BadMagic { .. }
| VaultError::Truncated { .. }
| VaultError::VersionTooNew { .. }
| VaultError::VersionInvalid { .. } => VaultCorruptError::new_err(msg),
VaultError::InsecurePermissions { .. } => VaultPermissionError::new_err(msg),
VaultError::WrongPassphrase => DecryptionError::new_err(msg),
VaultError::EntryLimitExceeded { .. } => EntryLimitError::new_err(msg),
VaultError::LockTimeout { .. } | VaultError::StaleLock { .. } => {
LockError::new_err(msg)
}
},
SecretshError::MasterKey(mke) => match mke {
RustMasterKeyError::EnvVarNotSet { .. }
| RustMasterKeyError::PassphraseTooShort { .. } => MasterKeyError::new_err(msg),
},
SecretshError::Placeholder(RustPlaceholderError::UnresolvedKey { .. }) => {
PlaceholderError::new_err(msg)
}
SecretshError::Tokenization(_) => TokenizationError::new_err(msg),
SecretshError::Spawn(se) => match se {
SpawnError::NotFound { .. }
| SpawnError::NotExecutable { .. }
| SpawnError::ForkExecFailed { .. }
| SpawnError::Timeout { .. }
| SpawnError::OutputLimitExceeded { .. } => CommandError::new_err(msg),
},
SecretshError::Redaction(_) | SecretshError::Io(_) => SecretSHError::new_err(msg),
}
}
#[pyclass(name = "RunResult")]
pub struct PyRunResult {
#[pyo3(get)]
pub stdout: String,
#[pyo3(get)]
pub stderr: String,
#[pyo3(get)]
pub exit_code: i32,
#[pyo3(get)]
pub timed_out: bool,
}
#[pymethods]
impl PyRunResult {
fn __repr__(&self) -> String {
format!(
"RunResult(exit_code={}, timed_out={}, stdout={:?}, stderr={:?})",
self.exit_code,
if self.timed_out { "True" } else { "False" },
if self.stdout.len() > 120 {
format!("{}…", &self.stdout[..120])
} else {
self.stdout.clone()
},
if self.stderr.len() > 120 {
format!("{}…", &self.stderr[..120])
} else {
self.stderr.clone()
},
)
}
}
#[pyclass(name = "Vault")]
pub struct PyVault {
inner: Mutex<Option<Vault>>,
closed: AtomicBool,
}
#[pymethods]
impl PyVault {
#[new]
#[pyo3(signature = (master_key_env, vault_path = None, allow_insecure_permissions = false))]
fn new(
py: Python<'_>,
master_key_env: String,
vault_path: Option<String>,
allow_insecure_permissions: bool,
) -> PyResult<Self> {
ensure_hardened();
let resolved_path = match vault_path {
Some(p) => std::path::PathBuf::from(p),
None => {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_owned());
std::path::PathBuf::from(home)
.join(".local")
.join("share")
.join("secretsh")
.join("vault.bin")
}
};
let config = VaultConfig {
vault_path: resolved_path,
master_key_env,
allow_insecure_permissions,
kdf_memory: None, };
let vault = py
.allow_threads(|| Vault::open(&config))
.map_err(to_py_err)?;
Ok(PyVault {
inner: Mutex::new(Some(vault)),
closed: AtomicBool::new(false),
})
}
fn set(&self, py: Python<'_>, key: String, value: &Bound<'_, PyAny>) -> PyResult<()> {
self.check_open()?;
let secret_bytes: Zeroizing<Vec<u8>> =
if let Ok(ba) = value.downcast::<pyo3::types::PyByteArray>() {
let data = Zeroizing::new(ba.to_vec());
unsafe {
ba.as_bytes_mut().zeroize();
}
data
} else if let Ok(s) = value.extract::<String>() {
Zeroizing::new(s.into_bytes())
} else if let Ok(b) = value.extract::<Vec<u8>>() {
Zeroizing::new(b)
} else {
return Err(pyo3::exceptions::PyTypeError::new_err(
"value must be str, bytes, or bytearray",
));
};
py.allow_threads(|| {
let mut guard = self.inner.lock().unwrap();
let vault = guard
.as_mut()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
vault.set(&key, &secret_bytes).map_err(to_py_err)
})
}
fn delete(&self, py: Python<'_>, key: String) -> PyResult<bool> {
self.check_open()?;
py.allow_threads(|| {
let mut guard = self.inner.lock().unwrap();
let vault = guard
.as_mut()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
vault.delete(&key).map_err(to_py_err)
})
}
fn list_keys(&self) -> PyResult<Vec<String>> {
self.check_open()?;
let guard = self.inner.lock().unwrap();
let vault = guard
.as_ref()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
Ok(vault.list_keys())
}
#[pyo3(signature = (command, timeout_secs = 300, max_output_bytes = 52428800, max_stderr_bytes = 1048576))]
fn run(
&self,
py: Python<'_>,
command: String,
timeout_secs: u64,
max_output_bytes: usize,
max_stderr_bytes: usize,
) -> PyResult<PyRunResult> {
self.check_open()?;
type ArgvVec = Vec<Zeroizing<Vec<u8>>>;
type SecretsVec = Vec<(String, Vec<u8>)>;
let (resolved_argv, all_secrets): (ArgvVec, SecretsVec) = {
let guard = self.inner.lock().unwrap();
let vault = guard
.as_ref()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
let tokenize_result = tokenize(&command).map_err(to_py_err)?;
let mut argv: Vec<Zeroizing<Vec<u8>>> =
Vec::with_capacity(tokenize_result.tokens.len());
for token in &tokenize_result.tokens {
if token.placeholders.is_empty() {
let mut bytes = token.value.as_bytes().to_vec();
bytes.push(0); argv.push(Zeroizing::new(bytes));
} else {
let token_bytes = token.value.as_bytes();
let mut out_bytes: Vec<u8> = Vec::new();
let mut cursor = 0usize;
for ph in &token.placeholders {
out_bytes.extend_from_slice(&token_bytes[cursor..ph.start]);
let secret = vault.resolve_placeholder(&ph.key).ok_or_else(|| {
to_py_err(SecretshError::Placeholder(
RustPlaceholderError::UnresolvedKey {
key: ph.key.clone(),
},
))
})?;
out_bytes.extend_from_slice(secret);
cursor = ph.end;
}
out_bytes.extend_from_slice(&token_bytes[cursor..]);
out_bytes.push(0);
argv.push(Zeroizing::new(out_bytes));
}
}
let secrets: Vec<(String, Vec<u8>)> = vault
.all_secret_values()
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_vec()))
.collect();
(argv, secrets)
};
let spawn_result = py.allow_threads(|| {
let secret_refs: Vec<(&str, &[u8])> = all_secrets
.iter()
.map(|(k, v)| (k.as_str(), v.as_slice()))
.collect();
let redactor = Redactor::new(&secret_refs).map_err(to_py_err)?;
let config = SpawnConfig {
timeout_secs,
max_output_bytes,
max_stderr_bytes,
};
spawn_child(resolved_argv, &redactor, &config).map_err(to_py_err)
})?;
Ok(PyRunResult {
stdout: spawn_result.stdout,
stderr: spawn_result.stderr,
exit_code: spawn_result.exit_code,
timed_out: spawn_result.timed_out,
})
}
fn export(&self, py: Python<'_>, out_path: String) -> PyResult<()> {
self.check_open()?;
py.allow_threads(|| {
let guard = self.inner.lock().unwrap();
let vault = guard
.as_ref()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
vault
.export(std::path::Path::new(&out_path))
.map_err(to_py_err)
})
}
#[pyo3(signature = (import_path, overwrite = false, import_key_env = None), name = "import_vault")]
fn import_vault(
&self,
py: Python<'_>,
import_path: String,
overwrite: bool,
import_key_env: Option<String>,
) -> PyResult<(usize, usize, usize)> {
self.check_open()?;
py.allow_threads(|| {
let mut guard = self.inner.lock().unwrap();
let vault = guard
.as_mut()
.ok_or_else(|| SecretSHError::new_err("vault is closed"))?;
vault
.import(
std::path::Path::new(&import_path),
import_key_env.as_deref(),
overwrite,
)
.map_err(to_py_err)
})
}
fn close(&self) {
if !self.closed.swap(true, Ordering::Relaxed) {
let mut guard = self.inner.lock().unwrap();
if let Some(mut vault) = guard.take() {
vault.close();
}
}
}
fn __enter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __exit__(
&self,
_exc_type: &Bound<'_, PyAny>,
_exc_val: &Bound<'_, PyAny>,
_exc_tb: &Bound<'_, PyAny>,
) {
self.close();
}
fn __del__(&self) {
self.close();
}
}
impl PyVault {
fn check_open(&self) -> PyResult<()> {
if self.closed.load(Ordering::Relaxed) {
Err(SecretSHError::new_err("vault is closed"))
} else {
Ok(())
}
}
}
#[pymodule]
fn _native(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyVault>()?;
m.add_class::<PyRunResult>()?;
m.add("SecretSHError", m.py().get_type::<SecretSHError>())?;
m.add(
"VaultNotFoundError",
m.py().get_type::<VaultNotFoundError>(),
)?;
m.add("VaultCorruptError", m.py().get_type::<VaultCorruptError>())?;
m.add(
"VaultPermissionError",
m.py().get_type::<VaultPermissionError>(),
)?;
m.add("DecryptionError", m.py().get_type::<DecryptionError>())?;
m.add("MasterKeyError", m.py().get_type::<MasterKeyError>())?;
m.add("PlaceholderError", m.py().get_type::<PlaceholderError>())?;
m.add("TokenizationError", m.py().get_type::<TokenizationError>())?;
m.add("CommandError", m.py().get_type::<CommandError>())?;
m.add("EntryLimitError", m.py().get_type::<EntryLimitError>())?;
m.add("LockError", m.py().get_type::<LockError>())?;
Ok(())
}