use std::collections::HashMap;
use chrono::Utc;
use crate::crypto::EnvelopeCipher;
use crate::db::models::{
CredentialCreateRequest, CredentialEntry, CredentialFilter, CredentialListResponse,
CredentialResponse, KeychainSetRequest,
};
use crate::db::queries::catalog as catalog_queries;
use crate::db::queries::credential as queries;
use crate::db::DbPool;
use crate::error::{AppError, AppResult};
use crate::playbook::types::Playbook;
use crate::secrets::{build_secret_provider, resolve_keychain_entry};
use crate::services::keychain::KeychainService;
const KEYCHAIN_CACHE_TTL_SECS: i64 = 600;
const KEYCHAIN_CACHE_SCOPE: &str = "local";
#[derive(Clone)]
pub struct CredentialService {
pool: DbPool,
cipher: EnvelopeCipher,
keychain: KeychainService,
}
impl CredentialService {
pub fn new(pool: DbPool, cipher: EnvelopeCipher, keychain: KeychainService) -> Self {
Self {
pool,
cipher,
keychain,
}
}
pub async fn create_or_update(
&self,
request: CredentialCreateRequest,
) -> AppResult<CredentialResponse> {
let encrypted_data = self.cipher.seal_json_to_storage(&request.data).await?;
if let Some(existing) = queries::get_credential_by_name(&self.pool, &request.name).await? {
queries::update_credential(
&self.pool,
existing.id,
&request.credential_type,
&encrypted_data,
request.meta.as_ref(),
request.tags.as_deref(),
request.description.as_deref(),
)
.await?;
let updated = queries::get_credential_by_id(&self.pool, existing.id)
.await?
.ok_or_else(|| {
AppError::Internal("Failed to fetch updated credential".to_string())
})?;
return Ok(self.entry_to_response(updated, None));
}
let id = queries::insert_credential(
&self.pool,
&request.name,
&request.credential_type,
&encrypted_data,
request.meta.as_ref(),
request.tags.as_deref(),
request.description.as_deref(),
)
.await?;
let created = queries::get_credential_by_id(&self.pool, id)
.await?
.ok_or_else(|| AppError::Internal("Failed to fetch created credential".to_string()))?;
Ok(self.entry_to_response(created, None))
}
pub async fn get(
&self,
identifier: &str,
include_data: bool,
execution_id: Option<i64>,
) -> AppResult<CredentialResponse> {
match self.find_credential(identifier).await {
Ok(entry) => {
let data = if include_data {
Some(self.cipher.open_storage_json(&entry.data).await?)
} else {
None
};
Ok(self.entry_to_response(entry, data))
}
Err(AppError::NotFound(_)) if include_data => {
if let Some(exec_id) = execution_id {
if let Some(data) = self.try_resolve_keychain(exec_id, identifier).await? {
return Ok(self.keychain_response(identifier, data));
}
}
Err(AppError::NotFound(format!(
"Credential '{}' not found",
identifier
)))
}
Err(e) => Err(e),
}
}
async fn try_resolve_keychain(
&self,
execution_id: i64,
alias: &str,
) -> AppResult<Option<serde_json::Value>> {
let info: Option<(i64, Option<serde_json::Value>)> = sqlx::query_as(
r#"
SELECT catalog_id, context->'workload' as workload
FROM noetl.event
WHERE execution_id = $1
AND event_type IN ('playbook.initialized', 'playbook_started')
LIMIT 1
"#,
)
.bind(execution_id)
.fetch_optional(&self.pool)
.await?;
let Some((catalog_id, workload)) = info else {
return Ok(None);
};
match self
.keychain
.get(catalog_id, alias, Some(execution_id), KEYCHAIN_CACHE_SCOPE)
.await
{
Ok(c) if c.status == "found" => {
if let Some(data) = c.data {
tracing::debug!(execution_id, alias, "keychain.cache_hit");
return Ok(Some(data));
}
}
Ok(_) => {} Err(e) => {
tracing::warn!(execution_id, alias, error = %e, "keychain.cache_read failed; resolving fresh")
}
}
let Some(entry) = catalog_queries::get_catalog_by_id(&self.pool, catalog_id).await? else {
return Ok(None);
};
let playbook: Playbook = match serde_yaml::from_str(&entry.content) {
Ok(pb) => pb,
Err(e) => {
tracing::warn!(execution_id, error = %e, "keychain resolve: playbook parse failed");
return Ok(None);
}
};
let Some(kc) = playbook.find_keychain(alias) else {
return Ok(None);
};
let Some(provider_id) = kc.provider.as_deref() else {
return Ok(None);
};
let workload_map: HashMap<String, serde_json::Value> = workload
.as_ref()
.and_then(|w| w.as_object())
.map(|m| m.clone().into_iter().collect())
.unwrap_or_default();
let provider = build_secret_provider(provider_id)?;
tracing::info!(
execution_id,
alias,
provider = provider_id,
"keychain.resolve"
);
let data = resolve_keychain_entry(kc, &workload_map, &*provider).await?;
let set_req = KeychainSetRequest {
data: data.clone(),
scope_type: KEYCHAIN_CACHE_SCOPE.to_string(),
execution_id: Some(execution_id),
expires_at: None,
expires_in: Some(KEYCHAIN_CACHE_TTL_SECS),
auto_renew: false,
renew_config: None,
};
if let Err(e) = self.keychain.set(catalog_id, alias, set_req).await {
tracing::warn!(execution_id, alias, error = %e, "keychain.cache_write failed");
}
Ok(Some(data))
}
fn keychain_response(&self, alias: &str, data: serde_json::Value) -> CredentialResponse {
let now = Utc::now();
CredentialResponse {
id: "0".to_string(),
name: alias.to_string(),
credential_type: "keychain".to_string(),
meta: None,
tags: None,
description: Some("resolved from keychain provider".to_string()),
data: Some(data),
created_at: now,
updated_at: now,
}
}
pub async fn list(
&self,
credential_type: Option<&str>,
search: Option<&str>,
) -> AppResult<CredentialListResponse> {
let entries = queries::list_credentials(&self.pool, credential_type, search).await?;
let items: Vec<CredentialResponse> = entries
.into_iter()
.map(|e| self.entry_to_response(e, None))
.collect();
let filter = if credential_type.is_some() || search.is_some() {
Some(CredentialFilter {
credential_type: credential_type.map(|s| s.to_string()),
q: search.map(|s| s.to_string()),
})
} else {
None
};
Ok(CredentialListResponse { items, filter })
}
pub async fn delete(&self, identifier: &str) -> AppResult<String> {
let entry = self.find_credential(identifier).await?;
let id = entry.id;
let deleted = queries::delete_credential_by_id(&self.pool, id).await?;
if deleted {
Ok(id.to_string())
} else {
Err(AppError::Internal(
"Failed to delete credential".to_string(),
))
}
}
async fn find_credential(&self, identifier: &str) -> AppResult<CredentialEntry> {
if let Ok(id) = identifier.parse::<i64>() {
if let Some(entry) = queries::get_credential_by_id(&self.pool, id).await? {
return Ok(entry);
}
}
queries::get_credential_by_name(&self.pool, identifier)
.await?
.ok_or_else(|| AppError::NotFound(format!("Credential '{}' not found", identifier)))
}
fn entry_to_response(
&self,
entry: CredentialEntry,
data: Option<serde_json::Value>,
) -> CredentialResponse {
CredentialResponse {
id: entry.id.to_string(),
name: entry.name,
credential_type: entry.credential_type,
meta: entry.meta,
tags: entry.tags,
description: entry.description,
data,
created_at: entry.created_at,
updated_at: entry.updated_at,
}
}
}