#[allow(dead_code)]
pub const CELLOS_SECRET_DELIVERY_MODE_ENV: &str = "CELLOS_SECRET_DELIVERY";
#[allow(dead_code)]
pub const CELLOS_SECRET_RUNTIME_SOCKET_ENV: &str = "CELLOS_RUNTIME_SECRET_SOCKET";
#[allow(dead_code)]
pub const CELLOS_SECRET_RUNTIME_TOKEN_ENV: &str = "CELLOS_RUNTIME_SECRET_TOKEN";
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct RuntimeSecretEntryInput {
pub key: String,
pub value: zeroize::Zeroizing<String>,
pub ttl_seconds: u64,
}
#[cfg(unix)]
mod imp {
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use cellos_core::ports::SecretBroker;
use cellos_core::RuntimeSecretLeaseRequest;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::oneshot;
use super::{
RuntimeSecretEntryInput, CELLOS_SECRET_DELIVERY_MODE_ENV, CELLOS_SECRET_RUNTIME_SOCKET_ENV,
CELLOS_SECRET_RUNTIME_TOKEN_ENV,
};
#[derive(Debug)]
pub struct RuntimeSecretSession {
_tempdir: TempDir,
socket_path: PathBuf,
token: String,
delivery_mode: &'static str,
shutdown_tx: Option<oneshot::Sender<()>>,
task: tokio::task::JoinHandle<()>,
}
#[derive(Debug, Clone)]
struct RuntimeSecretEntry {
value: zeroize::Zeroizing<String>,
expires_at: Instant,
}
#[derive(Debug, Clone)]
struct RuntimeSecretLeaseEntry {
ttl_seconds: u64,
expires_at: Instant,
}
#[derive(Clone)]
enum RuntimeSecretSource {
Snapshot(Arc<HashMap<String, RuntimeSecretEntry>>),
Leased {
broker: Arc<dyn SecretBroker>,
cell_id: String,
entries: Arc<HashMap<String, RuntimeSecretLeaseEntry>>,
},
}
#[derive(Serialize, Deserialize)]
struct SecretGetRequest {
token: String,
key: String,
}
impl std::fmt::Debug for SecretGetRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SecretGetRequest")
.field("token", &"<redacted>")
.field("key", &self.key)
.finish()
}
}
#[derive(Serialize, Deserialize)]
struct SecretGetResponse {
#[serde(skip_serializing_if = "Option::is_none")]
value: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
impl std::fmt::Debug for SecretGetResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SecretGetResponse")
.field("value", &self.value.as_ref().map(|_| "<redacted>"))
.field("error", &self.error)
.finish()
}
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
let len = a.len().max(b.len());
let mut diff: u8 = (a.len() != b.len()) as u8;
for i in 0..len {
let av = *a.get(i).unwrap_or(&0);
let bv = *b.get(i).unwrap_or(&0);
diff |= av ^ bv;
}
diff == 0
}
impl RuntimeSecretSession {
pub async fn start_snapshot(
cell_id: &str,
entries: &[RuntimeSecretEntryInput],
) -> anyhow::Result<Self> {
let secret_map: HashMap<String, RuntimeSecretEntry> = entries
.iter()
.map(|entry| {
(
entry.key.clone(),
RuntimeSecretEntry {
value: entry.value.clone(),
expires_at: Instant::now()
+ Duration::from_secs(entry.ttl_seconds.max(1)),
},
)
})
.collect();
Self::start_with_source(
cell_id,
RuntimeSecretSource::Snapshot(Arc::new(secret_map)),
"runtimeBroker",
)
.await
}
pub async fn start_leased(
cell_id: &str,
broker: Arc<dyn SecretBroker>,
requests: &[RuntimeSecretLeaseRequest],
) -> anyhow::Result<Self> {
let lease_map: HashMap<String, RuntimeSecretLeaseEntry> = requests
.iter()
.map(|entry| {
(
entry.key.clone(),
RuntimeSecretLeaseEntry {
ttl_seconds: entry.ttl_seconds,
expires_at: Instant::now()
+ Duration::from_secs(entry.ttl_seconds.max(1)),
},
)
})
.collect();
Self::start_with_source(
cell_id,
RuntimeSecretSource::Leased {
broker,
cell_id: cell_id.to_string(),
entries: Arc::new(lease_map),
},
"runtimeLeasedBroker",
)
.await
}
async fn start_with_source(
cell_id: &str,
source: RuntimeSecretSource,
delivery_mode: &'static str,
) -> anyhow::Result<Self> {
let tempdir_root = PathBuf::from("/tmp");
let tempdir = tempfile::Builder::new()
.prefix("cellos-rs-")
.tempdir_in(&tempdir_root)
.with_context(|| {
format!(
"create runtime secret tempdir for cell {cell_id} under {}",
tempdir_root.display()
)
})?;
let socket_path = tempdir.path().join("broker.sock");
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("bind runtime secret socket {}", socket_path.display()))?;
let token = uuid::Uuid::new_v4().to_string();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
let task_token = token.clone();
let task = tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
accept = listener.accept() => {
let Ok((stream, _)) = accept else {
break;
};
let source = source.clone();
let token = task_token.clone();
tokio::spawn(async move {
if let Err(err) = handle_client(stream, &token, source).await {
tracing::debug!(
target: "cellos.supervisor.runtime_secret",
error = %err,
"runtime secret request failed"
);
}
});
}
}
}
});
Ok(Self {
_tempdir: tempdir,
socket_path,
token,
delivery_mode,
shutdown_tx: Some(shutdown_tx),
task,
})
}
pub fn env_pairs(&self) -> [(String, String); 3] {
[
(
CELLOS_SECRET_DELIVERY_MODE_ENV.to_string(),
self.delivery_mode.to_string(),
),
(
CELLOS_SECRET_RUNTIME_SOCKET_ENV.to_string(),
self.socket_path.display().to_string(),
),
(
CELLOS_SECRET_RUNTIME_TOKEN_ENV.to_string(),
self.token.clone(),
),
]
}
pub async fn shutdown(mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
let _ = self.task.await;
}
}
async fn handle_client(
stream: UnixStream,
expected_token: &str,
source: RuntimeSecretSource,
) -> anyhow::Result<()> {
let (read_half, mut write_half) = stream.into_split();
let mut reader = BufReader::new(read_half);
let mut line = String::new();
let n = reader
.read_line(&mut line)
.await
.context("read request line")?;
if n == 0 {
return Ok(());
}
let req: SecretGetRequest =
serde_json::from_str(&line).context("parse runtime secret request")?;
let response = if !constant_time_eq(req.token.as_bytes(), expected_token.as_bytes()) {
SecretGetResponse {
value: None,
error: Some("unauthorized".into()),
}
} else {
match source {
RuntimeSecretSource::Snapshot(secrets) => {
if let Some(entry) = secrets.get(&req.key) {
if Instant::now() > entry.expires_at {
SecretGetResponse {
value: None,
error: Some("secret expired".into()),
}
} else {
SecretGetResponse {
value: Some(entry.value.as_str().to_string()),
error: None,
}
}
} else {
SecretGetResponse {
value: None,
error: Some(format!("unknown secret key {:?}", req.key)),
}
}
}
RuntimeSecretSource::Leased {
broker,
cell_id,
entries,
} => {
if let Some(entry) = entries.get(&req.key) {
if Instant::now() > entry.expires_at {
SecretGetResponse {
value: None,
error: Some("secret expired".into()),
}
} else {
match broker
.fetch_runtime_secret(&req.key, &cell_id, entry.ttl_seconds)
.await
{
Ok(view) => SecretGetResponse {
value: Some(view.value.as_str().to_string()),
error: None,
},
Err(err) => SecretGetResponse {
value: None,
error: Some(err.to_string()),
},
}
}
} else {
SecretGetResponse {
value: None,
error: Some(format!("unknown secret key {:?}", req.key)),
}
}
}
}
};
let payload = serde_json::to_string(&response).context("encode runtime secret response")?;
write_half
.write_all(payload.as_bytes())
.await
.context("write runtime secret response")?;
write_half
.write_all(b"\n")
.await
.context("write runtime secret response newline")?;
Ok(())
}
pub async fn cli_main(args: &[OsString]) -> anyhow::Result<()> {
match args {
[command, key] if command == "get" => {
let key = key.to_string_lossy().to_string();
let socket_path = std::env::var(CELLOS_SECRET_RUNTIME_SOCKET_ENV)
.with_context(|| format!("{CELLOS_SECRET_RUNTIME_SOCKET_ENV} not set"))?;
let token = std::env::var(CELLOS_SECRET_RUNTIME_TOKEN_ENV)
.with_context(|| format!("{CELLOS_SECRET_RUNTIME_TOKEN_ENV} not set"))?;
let mut stream = UnixStream::connect(&socket_path)
.await
.with_context(|| format!("connect runtime secret socket {socket_path}"))?;
let request = SecretGetRequest { token, key };
let payload =
serde_json::to_string(&request).context("encode runtime secret request")?;
stream
.write_all(payload.as_bytes())
.await
.context("write runtime secret request")?;
stream
.write_all(b"\n")
.await
.context("write runtime secret request newline")?;
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader
.read_line(&mut line)
.await
.context("read runtime secret response")?;
let response: SecretGetResponse =
serde_json::from_str(&line).context("parse runtime secret response")?;
if let Some(error) = response.error {
return Err(anyhow!(error));
}
if let Some(value) = response.value {
print!("{value}");
return Ok(());
}
Err(anyhow!("runtime secret broker returned empty response"))
}
_ => Err(anyhow!(
"usage: cellos-supervisor secret get <KEY> (requires runtimeBroker or runtimeLeasedBroker secret delivery)"
)),
}
}
#[cfg(test)]
mod wave2_tests {
use super::{constant_time_eq, SecretGetRequest, SecretGetResponse};
#[test]
fn constant_time_eq_matches_logical_equality() {
assert!(constant_time_eq(b"", b""));
assert!(constant_time_eq(b"abc", b"abc"));
assert!(!constant_time_eq(b"abc", b"abd"));
assert!(!constant_time_eq(b"abc", b"abcd"));
assert!(!constant_time_eq(b"abcd", b"abc"));
assert!(!constant_time_eq(b"", b"x"));
assert!(!constant_time_eq(b"x", b""));
}
#[test]
fn secret_get_request_debug_redacts_token() {
let req = SecretGetRequest {
token: "RUNTIME-T2-WAVE2-SENTINEL".into(),
key: "DB_PASSWORD".into(),
};
let dbg = format!("{req:?}");
assert!(!dbg.contains("RUNTIME-T2-WAVE2-SENTINEL"));
assert!(dbg.contains("DB_PASSWORD"));
}
#[test]
fn secret_get_response_debug_redacts_value() {
let resp = SecretGetResponse {
value: Some("RESPONSE-T2-WAVE2-SENTINEL".into()),
error: None,
};
let dbg = format!("{resp:?}");
assert!(!dbg.contains("RESPONSE-T2-WAVE2-SENTINEL"));
let resp_none = SecretGetResponse {
value: None,
error: Some("unauthorized".into()),
};
let dbg_none = format!("{resp_none:?}");
assert!(dbg_none.contains("unauthorized"));
assert!(!dbg_none.contains("<redacted>"));
}
}
}
#[cfg(not(unix))]
mod imp {
use anyhow::anyhow;
use cellos_core::ports::SecretBroker;
use cellos_core::RuntimeSecretLeaseRequest;
use std::sync::Arc;
use super::RuntimeSecretEntryInput;
#[derive(Debug)]
pub struct RuntimeSecretSession;
impl RuntimeSecretSession {
pub async fn start(
_cell_id: &str,
_entries: &[RuntimeSecretEntryInput],
) -> anyhow::Result<Self> {
Err(anyhow!(
"runtimeBroker secret delivery requires unix domain sockets and is not supported on this platform"
))
}
pub fn env_pairs(&self) -> [(String, String); 0] {
[]
}
pub async fn shutdown(self) {}
pub async fn start_snapshot(
cell_id: &str,
entries: &[RuntimeSecretEntryInput],
) -> anyhow::Result<Self> {
Self::start(cell_id, entries).await
}
pub async fn start_leased(
_cell_id: &str,
_broker: Arc<dyn SecretBroker>,
_requests: &[RuntimeSecretLeaseRequest],
) -> anyhow::Result<Self> {
Err(anyhow!(
"runtimeLeasedBroker requires unix domain sockets and is not supported on this platform"
))
}
}
pub async fn cli_main(_args: &[std::ffi::OsString]) -> anyhow::Result<()> {
Err(anyhow!(
"runtime secret helper requires unix domain sockets and is not supported on this platform"
))
}
}
pub use imp::{cli_main, RuntimeSecretSession};