use crate::auth::agent::get_orign_agent_key;
use crate::humans::slack::client::ask_chat_approval;
use crate::mutation::Mutation;
use crate::query::Query;
use crate::{
models::V1UserProfile,
resources::v1::humans::models::{
V1FeedbackRequest, V1FeedbackRequestKind, V1FeedbackResponse, V1Human, V1HumanRequest,
V1HumanStatus, V1Humans,
},
state::AppState,
};
use anyhow::Result;
use axum::{
extract::{Extension, Path, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use chrono::Utc;
use nebulous::client::NebulousClient;
use nebulous::models::V1ResourceMeta;
use nebulous::resources::v1::containers::models::{V1ContainerRequest, V1EnvVar};
use sea_orm::ActiveModelTrait;
use sea_orm::ActiveValue::{NotSet, Set, Unchanged};
use sea_orm::DatabaseConnection;
use sea_orm::IntoActiveModel;
use serde_json::json;
use short_uuid::ShortUuid;
use tracing::{debug, info};
pub async fn create_human(
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
Json(payload): Json<V1HumanRequest>,
) -> impl IntoResponse {
info!("Creating human resource: {:?}", payload);
let db = state.db_pool.clone();
let id = ShortUuid::generate().to_string();
let mut owner_ids: Vec<String> = if let Some(orgs) = &user_profile.organizations {
orgs.keys().cloned().collect()
} else {
Vec::new()
};
owner_ids.push(user_profile.email.clone());
let owner = if payload.metadata.owner.is_empty() {
user_profile.email.clone()
} else {
payload.metadata.owner.clone()
};
if !owner_ids.contains(&owner) {
return (
StatusCode::FORBIDDEN,
Json(json!({ "error": "Unauthorized owner specified" })),
);
}
let now = Utc::now();
let new_human = crate::entities::human::ActiveModel {
id: sea_orm::Set(id.clone()),
name: sea_orm::Set(payload.metadata.name.clone()),
namespace: sea_orm::Set(payload.metadata.namespace.clone()),
owner_id: sea_orm::Set(owner),
medium: sea_orm::Set(payload.medium.clone()),
channel: sea_orm::Set(payload.channel.clone()),
response_job: sea_orm::Set(Some(
serde_json::to_value(payload.response_job.clone()).unwrap(),
)),
status: sea_orm::Set(None),
created_at: sea_orm::Set(now.into()),
updated_at: sea_orm::Set(now.into()),
created_by: sea_orm::Set(user_profile.email.clone()),
labels: sea_orm::Set(payload.metadata.labels.map(|map| serde_json::json!(map))),
..Default::default()
};
let inserted_human: crate::entities::human::Model = match new_human.insert(&db).await {
Ok(model) => model,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to insert Human: {}", err) })),
);
}
};
debug!("Inserted new Human with id {:?}", inserted_human.id);
let response = V1Human {
metadata: nebulous::models::V1ResourceMeta {
id: inserted_human.id,
name: inserted_human.name,
namespace: inserted_human.namespace,
owner: inserted_human.owner_id,
created_at: inserted_human.created_at.timestamp(),
updated_at: inserted_human.updated_at.timestamp(),
created_by: inserted_human.created_by,
labels: inserted_human
.labels
.map(|json_value| serde_json::from_value(json_value).unwrap_or_default()),
owner_ref: None, },
medium: inserted_human.medium,
channel: inserted_human.channel,
response_job: payload.response_job,
status: V1HumanStatus {
is_active: None,
last_active: None,
},
};
(StatusCode::CREATED, Json(json!(response)))
}
pub async fn list_humans(
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
) -> impl IntoResponse {
let db = state.db_pool.clone();
info!("Listing humans for user: {}", user_profile.email);
let mut owner_ids: Vec<String> = if let Some(orgs) = &user_profile.organizations {
orgs.keys().cloned().collect()
} else {
Vec::new()
};
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(|s| s.as_str()).collect();
let human_models = match Query::find_humans_by_owners(&db, &owner_id_refs).await {
Ok(models) => models,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to query humans: {}", err) })),
)
.into_response();
}
};
let mut humans_list = Vec::new();
for human_model in human_models {
let metadata = V1ResourceMeta {
id: human_model.id.clone(),
name: human_model.name.clone(),
namespace: human_model.namespace.clone(),
owner: human_model.owner_id.clone(),
created_at: human_model.created_at.timestamp(),
updated_at: human_model.updated_at.timestamp(),
created_by: human_model.created_by,
labels: human_model.labels.map(|json_value| {
serde_json::from_value::<std::collections::HashMap<String, String>>(json_value)
.unwrap_or_default()
}),
owner_ref: None,
};
let status = V1HumanStatus {
is_active: None,
last_active: None,
};
humans_list.push(V1Human {
metadata,
medium: human_model.medium,
channel: human_model.channel,
response_job: serde_json::from_value(human_model.response_job.unwrap_or_default())
.unwrap_or_default(),
status,
});
}
let response = V1Humans {
humans: humans_list,
};
(StatusCode::OK, Json(response)).into_response()
}
pub async fn get_human(
Path((namespace, name)): Path<(String, String)>,
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
) -> impl IntoResponse {
let mut owner_ids: Vec<String> = if let Some(orgs) = &user_profile.organizations {
orgs.keys().cloned().collect()
} else {
Vec::new()
};
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(|s| s.as_str()).collect();
info!(
"Fetching human with namespace={} and name={}",
namespace, name
);
match Query::find_human_by_name_and_namespace_and_owners(
&state.db_pool,
&name,
&namespace,
&owner_id_refs,
)
.await
{
Ok(Some(human_model)) => {
let response = V1Human {
metadata: V1ResourceMeta {
id: human_model.id,
name: human_model.name,
namespace: human_model.namespace,
owner: human_model.owner_id,
created_at: human_model.created_at.timestamp(),
updated_at: human_model.updated_at.timestamp(),
created_by: human_model.created_by,
labels: human_model
.labels
.map(|json_value| serde_json::from_value(json_value).unwrap_or_default()),
owner_ref: None,
},
medium: human_model.medium,
channel: human_model.channel,
response_job: serde_json::from_value(human_model.response_job.unwrap_or_default())
.unwrap_or_default(),
status: V1HumanStatus {
is_active: None,
last_active: None,
},
};
(StatusCode::OK, Json(response)).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({ "error": "Human not found" })),
)
.into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("DB error: {}", err) })),
)
.into_response(),
}
}
pub async fn delete_human(
Path((namespace, name)): Path<(String, String)>,
State(state): State<AppState>,
Extension(user_profile): Extension<crate::models::V1UserProfile>,
) -> impl IntoResponse {
let mut owner_ids: Vec<String> = user_profile
.organizations
.as_ref()
.map(|orgs| orgs.keys().cloned().collect())
.unwrap_or_default();
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(String::as_str).collect();
match Mutation::delete_human_by_name_namespace_and_owners(
&state.db_pool,
&name,
&namespace,
&owner_id_refs,
)
.await
{
Ok(0) => {
(
StatusCode::NOT_FOUND,
Json(json!({"error": "Human not found"})),
)
.into_response()
}
Ok(_) => {
StatusCode::NO_CONTENT.into_response()
}
Err(err) => {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("DB error: {}", err)})),
)
.into_response()
}
}
}
pub async fn request_human_feedback(
Path((namespace, name)): Path<(String, String)>,
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
Json(feedback_request): Json<V1FeedbackRequest>,
) -> impl IntoResponse {
let db = state.db_pool.clone();
let mut owner_ids: Vec<String> = user_profile
.organizations
.as_ref()
.map(|orgs| orgs.keys().cloned().collect())
.unwrap_or_default();
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(String::as_str).collect();
let feedback_id = ShortUuid::generate().to_string();
let model_opt = match crate::query::Query::find_human_by_name_and_namespace_and_owners(
&state.db_pool,
&name,
&namespace,
&owner_id_refs,
)
.await
{
Ok(val) => val,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("DB error fetching Human: {}", err)})),
)
.into_response();
}
};
let Some(human_model) = model_opt else {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "Human not found"})),
)
.into_response();
};
let resource_ref = format!("{}.{}.Human", name, namespace);
let new_feedback = crate::entities::feedback::ActiveModel {
id: sea_orm::Set(feedback_id.to_string()),
owner_id: sea_orm::Set(human_model.owner_id.clone()),
human_id: sea_orm::Set(human_model.id.clone()),
kind: sea_orm::Set(feedback_request.kind.clone()),
request: sea_orm::Set(
serde_json::to_value(&feedback_request).unwrap_or_else(|_| json!(null)),
),
response: sea_orm::Set(None),
status: sea_orm::Set(None),
created_by: sea_orm::Set(user_profile.email.clone()),
created_at: sea_orm::Set(chrono::Utc::now().into()),
updated_at: sea_orm::Set(chrono::Utc::now().into()),
};
if let Err(err) = new_feedback.insert(&db).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("Failed to insert feedback record: {}", err)})),
)
.into_response();
}
match human_model.medium.to_lowercase().as_str() {
"slack" => {
match request_slack_feedback(
&feedback_id,
&human_model,
feedback_request,
&human_model.owner_id,
&resource_ref,
)
.await
{
Ok(()) => StatusCode::OK.into_response(), Err((status, err_json)) => (status, err_json).into_response(),
}
}
_ => (
StatusCode::BAD_REQUEST,
Json(json!({"error": "Currently, only Slack is supported."})),
)
.into_response(),
}
}
async fn request_slack_feedback(
feedback_id: &str,
human_model: &crate::entities::human::Model,
feedback_request: V1FeedbackRequest,
owner: &str,
resource_ref: &str,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
let Some(ref slack_channel) = human_model.channel else {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({ "error": "Slack channel not specified" })),
));
};
if feedback_request.kind.to_lowercase() != "approval" {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({ "error": "Unsupported feedback request kind" })),
));
}
let Some(request_kind) = feedback_request.request else {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({"error": "No request data found in feedback request"})),
));
};
#[allow(irrefutable_let_patterns)]
let V1FeedbackRequestKind::V1ApprovalRequest(approval_data) = request_kind
else {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({"error": "Request kind did not contain V1ApprovalRequest"})),
));
};
let content = approval_data.content;
let messages = match approval_data.messages {
Some(messages) => messages,
None => {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({"error": "Messages field is required but was not provided"})),
))
}
};
debug!("Asking for approval of messages: {:?}", messages);
let result = match ask_chat_approval(
slack_channel,
&messages,
&content,
feedback_id,
resource_ref,
owner,
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": format!("Slack post failed: {}", err)})),
)),
};
match result {
Ok(_) => Ok(()), Err((status, json_err)) => Err((status, json_err)),
}
}
pub async fn list_human_feedback(
Path((namespace, name)): Path<(String, String)>,
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
) -> impl IntoResponse {
todo!()
}
pub async fn record_human_response(
Path((namespace, name, id)): Path<(String, String, String)>,
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
Json(feedback_response): Json<V1FeedbackResponse>,
) -> impl IntoResponse {
debug!(
"Recording human response for feedback: {:?}",
feedback_response
);
let mut owner_ids: Vec<String> = if let Some(orgs) = &user_profile.organizations {
orgs.keys().cloned().collect()
} else {
Vec::new()
};
owner_ids.push(user_profile.email.clone());
debug!("Getting orign agent key");
let orign_agent_key = match get_orign_agent_key(&user_profile).await {
Ok(key) => key,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to get orign agent key: {}", e) })),
));
}
};
debug!("Orign agent key: {}", orign_agent_key);
match process_human_response(
&state.db_pool,
&owner_ids,
&orign_agent_key,
&namespace,
&name,
&id,
feedback_response,
)
.await
{
Ok(_) => Ok(StatusCode::OK.into_response()),
Err((status, json)) => Err((status, json)),
}
}
pub async fn process_human_response(
db: &DatabaseConnection,
owner_ids: &Vec<String>,
agent_key: &str,
namespace: &str,
name: &str,
id: &str,
feedback_response: V1FeedbackResponse,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
let owner_id_refs: Vec<&str> = owner_ids.iter().map(String::as_str).collect();
let human_opt = match Query::find_human_by_name_and_namespace_and_owners(
&db,
name,
namespace,
&owner_id_refs,
)
.await
{
Ok(h) => h,
Err(err) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("DB error fetching Human: {}", err) })),
));
}
};
let Some(human_model) = human_opt else {
return Err((
StatusCode::NOT_FOUND,
Json(json!({ "error": "Human not found" })),
));
};
let feedback_opt = match Query::find_feedback_by_id_and_human_id(&db, id, &human_model.id).await
{
Ok(fb) => fb,
Err(err) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("DB error fetching Feedback: {}", err) })),
));
}
};
let Some(feedback_model) = feedback_opt else {
return Err((
StatusCode::NOT_FOUND,
Json(json!({ "error": "Feedback not found" })),
));
};
debug!("Feedback model: {:?}", feedback_model);
let response_value = serde_json::to_value(&feedback_response).unwrap_or_else(|_| json!(null));
let mut feedback_active_model = feedback_model.into_active_model();
feedback_active_model.response = Set(Some(response_value.clone()));
feedback_active_model.updated_at = Set(Utc::now().into());
let feedback_id = feedback_active_model.id.clone();
if let Err(err) = feedback_active_model.update(db).await {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to update Feedback: {}", err) })),
));
}
debug!("Feedback updated");
let feedback_value_str = response_value.to_string();
debug!("feedback_value_str: {}", feedback_value_str);
debug!("Kicking off container");
let nebulous_client = match NebulousClient::new_from_config() {
Ok(client) => client,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to create nebulous client: {}", e) })),
));
}
};
let mut container_request: V1ContainerRequest =
serde_json::from_value(human_model.response_job.unwrap_or_default()).unwrap_or_default();
let mut container_env = match container_request.env {
Some(env) => env,
None => Vec::new(),
};
let feedback_id_str = match feedback_id.clone() {
Set(v) | Unchanged(v) => v,
NotSet => String::new(),
};
container_env.push(V1EnvVar {
key: "FEEDBACK_ID".to_string(),
value: Some(feedback_id_str),
secret_name: None,
});
container_env.push(V1EnvVar {
key: "FEEDBACK_RESPONSE".to_string(),
value: Some(feedback_value_str),
secret_name: None,
});
container_env.push(V1EnvVar {
key: "ORIGN_API_KEY".to_string(),
value: Some(agent_key.to_string()),
secret_name: None,
});
container_request.env = Some(container_env);
let container = match nebulous_client.create_container(&container_request).await {
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("Failed to create container: {}", e) })),
));
}
};
debug!("Container created: {:?}", container);
Ok(())
}
pub async fn get_human_feedback(
Path((namespace, name, id)): Path<(String, String, String)>,
State(state): State<AppState>,
) -> impl IntoResponse {
todo!()
}