use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
};
use fraiseql_core::security::SecurityContext;
use uuid::Uuid;
use super::{
CreateObserverRequest, ListObserverLogsQuery, ListObserversQuery, ObserverRepository,
PaginatedResponse, UpdateObserverRequest,
};
use crate::extractors::OptionalSecurityContext;
#[derive(Clone)]
pub struct ObserverState {
pub repository: ObserverRepository,
}
pub async fn list_observers(
State(state): State<ObserverState>,
OptionalSecurityContext(security_context): OptionalSecurityContext,
Query(query): Query<ListObserversQuery>,
) -> impl IntoResponse {
let customer_org: Option<i64> = extract_customer_org(security_context.as_ref());
match state.repository.list(&query, customer_org).await {
Ok((observers, total_count)) => {
let response =
PaginatedResponse::new(observers, query.page, query.page_size, total_count);
(StatusCode::OK, Json(response)).into_response()
},
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to list observers: {}", error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn get_observer(
State(state): State<ObserverState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let customer_org: Option<i64> = None;
match state.repository.get_by_id(id, customer_org).await {
Ok(Some(observer)) => (StatusCode::OK, Json(observer)).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Observer not found" })),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to get observer {}: {}", id, error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn create_observer(
State(state): State<ObserverState>,
OptionalSecurityContext(security_context): OptionalSecurityContext,
Json(request): Json<CreateObserverRequest>,
) -> impl IntoResponse {
if request.name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Name is required" })),
)
.into_response();
}
if request.actions.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "At least one action is required" })),
)
.into_response();
}
if let Some(ref event_type) = request.event_type {
let valid_types = ["INSERT", "UPDATE", "DELETE", "CUSTOM"];
if !valid_types.contains(&event_type.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Invalid event_type '{}'. Must be one of: {:?}", event_type, valid_types)
})),
)
.into_response();
}
}
let customer_org: Option<i64> = extract_customer_org(security_context.as_ref());
let created_by: Option<&str> = extract_user_id(security_context.as_ref());
match state.repository.create(&request, customer_org, created_by).await {
Ok(observer) => (StatusCode::CREATED, Json(observer)).into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to create observer: {}", error_msg);
let status = if error_msg.contains("already exists") {
StatusCode::CONFLICT
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(status, Json(serde_json::json!({ "error": error_msg }))).into_response()
},
}
}
pub async fn update_observer(
State(state): State<ObserverState>,
OptionalSecurityContext(security_context): OptionalSecurityContext,
Path(id): Path<Uuid>,
Json(request): Json<UpdateObserverRequest>,
) -> impl IntoResponse {
if let Some(ref event_type) = request.event_type {
let valid_types = ["INSERT", "UPDATE", "DELETE", "CUSTOM"];
if !valid_types.contains(&event_type.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Invalid event_type '{}'. Must be one of: {:?}", event_type, valid_types)
})),
)
.into_response();
}
}
let customer_org: Option<i64> = extract_customer_org(security_context.as_ref());
let updated_by: Option<&str> = extract_user_id(security_context.as_ref());
match state.repository.update(id, &request, customer_org, updated_by).await {
Ok(Some(observer)) => (StatusCode::OK, Json(observer)).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Observer not found" })),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to update observer {}: {}", id, error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn delete_observer(
State(state): State<ObserverState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let customer_org: Option<i64> = None;
match state.repository.delete(id, customer_org).await {
Ok(true) => StatusCode::NO_CONTENT.into_response(),
Ok(false) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Observer not found" })),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to delete observer {}: {}", id, error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn get_observer_stats(
State(state): State<ObserverState>,
observer_id: Option<Path<Uuid>>,
) -> impl IntoResponse {
let customer_org: Option<i64> = None;
let id = observer_id.map(|p| p.0);
match state.repository.get_stats(id, customer_org).await {
Ok(stats) => (StatusCode::OK, Json(stats)).into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to get observer stats: {}", error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn list_observer_logs(
State(state): State<ObserverState>,
Path(observer_id): Path<Option<Uuid>>,
Query(mut query): Query<ListObserverLogsQuery>,
) -> impl IntoResponse {
if let Some(id) = observer_id {
query.observer_id = Some(id);
}
let customer_org: Option<i64> = None;
match state.repository.list_logs(&query, customer_org).await {
Ok((logs, total_count)) => {
let response = PaginatedResponse::new(logs, query.page, query.page_size, total_count);
(StatusCode::OK, Json(response)).into_response()
},
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to list observer logs: {}", error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn enable_observer(
State(state): State<ObserverState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let request = UpdateObserverRequest {
enabled: Some(true),
..Default::default()
};
let customer_org: Option<i64> = None;
let updated_by: Option<&str> = None;
match state.repository.update(id, &request, customer_org, updated_by).await {
Ok(Some(observer)) => (StatusCode::OK, Json(observer)).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Observer not found" })),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to enable observer {}: {}", id, error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
pub async fn disable_observer(
State(state): State<ObserverState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let request = UpdateObserverRequest {
enabled: Some(false),
..Default::default()
};
let customer_org: Option<i64> = None;
let updated_by: Option<&str> = None;
match state.repository.update(id, &request, customer_org, updated_by).await {
Ok(Some(observer)) => (StatusCode::OK, Json(observer)).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Observer not found" })),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to disable observer {}: {}", id, error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct RuntimeHealthState {
pub runtime: Arc<RwLock<super::ObserverRuntime>>,
}
pub async fn get_runtime_health(State(state): State<RuntimeHealthState>) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let health = runtime.health();
let status = if health.running {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let health_json = serde_json::json!({
"running": health.running,
"observer_count": health.observer_count,
"last_checkpoint": health.last_checkpoint,
"events_processed": health.events_processed,
"errors": health.errors
});
(status, Json(health_json)).into_response()
}
pub async fn reload_observers(State(state): State<RuntimeHealthState>) -> impl IntoResponse {
let runtime = state.runtime.read().await;
match runtime.reload_observers().await {
Ok(count) => (
StatusCode::OK,
Json(serde_json::json!({
"message": "Observers reloaded successfully",
"count": count
})),
)
.into_response(),
Err(e) => {
let error_msg = e.to_string();
tracing::error!("Failed to reload observers: {}", error_msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": error_msg })),
)
.into_response()
},
}
}
#[must_use]
fn extract_customer_org(ctx: Option<&SecurityContext>) -> Option<i64> {
ctx?.tenant_id.as_deref()?.parse::<i64>().ok()
}
#[must_use]
fn extract_user_id(ctx: Option<&SecurityContext>) -> Option<&str> {
Some(ctx?.user_id.as_str())
}