use std::sync::Arc;
use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use uuid::Uuid;
use super::runtime::ObserverRuntime;
#[derive(Clone)]
pub struct DlqState {
pub runtime: Arc<RwLock<ObserverRuntime>>,
}
#[derive(Debug, Serialize)]
pub struct DeliveryStatusSummary {
pub running: bool,
pub observer_count: usize,
pub events_processed: u64,
pub errors: u64,
pub dlq_count: usize,
}
#[derive(Debug, Serialize)]
pub struct DlqItemResponse {
pub id: Uuid,
pub event_id: Uuid,
pub entity_type: String,
pub entity_id: Uuid,
pub event_type: String,
pub action_type: String,
pub error_message: String,
pub attempts: u32,
}
impl From<&fraiseql_observers::DlqItem> for DlqItemResponse {
fn from(item: &fraiseql_observers::DlqItem) -> Self {
Self {
id: item.id,
event_id: item.event.id,
entity_type: item.event.entity_type.clone(),
entity_id: item.event.entity_id,
event_type: item.event.event_type.as_str().to_string(),
action_type: action_type_str(&item.action).to_string(),
error_message: item.error_message.clone(),
attempts: item.attempts,
}
}
}
#[derive(Debug, Serialize)]
pub struct DlqListResponse {
pub items: Vec<DlqItemResponse>,
pub total: usize,
pub limit: usize,
pub offset: usize,
}
#[derive(Debug, Deserialize)]
pub struct DlqListQuery {
#[serde(default = "default_limit")]
pub limit: usize,
#[serde(default)]
pub offset: usize,
pub action: Option<String>,
pub object_type: Option<String>,
}
const fn default_limit() -> usize {
50
}
#[derive(Debug, Serialize)]
pub struct RetryResponse {
pub success: bool,
pub item_id: Uuid,
pub message: String,
}
#[derive(Debug, Serialize)]
pub struct RetryAllResponse {
pub items_retried: usize,
pub items_failed: usize,
pub message: String,
}
pub async fn delivery_health_handler(State(state): State<DlqState>) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let health = runtime.health();
let dlq = runtime.dlq();
let summary = DeliveryStatusSummary {
running: health.running,
observer_count: health.observer_count,
events_processed: health.events_processed,
errors: health.errors,
dlq_count: dlq.count(),
};
(StatusCode::OK, Json(summary))
}
pub async fn dlq_list_handler(
State(state): State<DlqState>,
Query(query): Query<DlqListQuery>,
) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let dlq = runtime.dlq();
let all_items = dlq.list_all();
let filtered: Vec<_> = all_items
.iter()
.filter(|item| {
if let Some(ref action_filter) = query.action {
if action_type_str(&item.action) != action_filter.as_str() {
return false;
}
}
if let Some(ref object_type) = query.object_type {
if item.event.entity_type != *object_type {
return false;
}
}
true
})
.collect();
let total = filtered.len();
let page: Vec<DlqItemResponse> = filtered
.iter()
.skip(query.offset)
.take(query.limit)
.map(|item| DlqItemResponse::from(*item))
.collect();
let response = DlqListResponse {
items: page,
total,
limit: query.limit,
offset: query.offset,
};
(StatusCode::OK, Json(response))
}
pub async fn dlq_get_handler(
State(state): State<DlqState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let dlq = runtime.dlq();
match dlq.get(id) {
Some(item) => {
(StatusCode::OK, Json(serde_json::json!(DlqItemResponse::from(&item)))).into_response()
},
None => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "DLQ item not found" })),
)
.into_response(),
}
}
pub async fn dlq_retry_handler(
State(state): State<DlqState>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let dlq = runtime.dlq();
let Some(item) = dlq.get(id) else {
return (
StatusCode::NOT_FOUND,
Json(RetryResponse {
success: false,
item_id: id,
message: "DLQ item not found".to_string(),
}),
)
.into_response();
};
let executor_guard = runtime.executor_ref().read().await;
let Some(executor) = executor_guard.as_ref() else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(RetryResponse {
success: false,
item_id: id,
message: "Observer executor not available".to_string(),
}),
)
.into_response();
};
match executor.process_event(&item.event).await {
Ok(_summary) => {
dlq.remove(id);
(
StatusCode::OK,
Json(RetryResponse {
success: true,
item_id: id,
message: "Item re-processed successfully".to_string(),
}),
)
.into_response()
},
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(RetryResponse {
success: false,
item_id: id,
message: format!("Retry failed: {e}"),
}),
)
.into_response(),
}
}
pub async fn dlq_retry_all_handler(State(state): State<DlqState>) -> impl IntoResponse {
let runtime = state.runtime.read().await;
let dlq = runtime.dlq();
let items = dlq.list_all();
if items.is_empty() {
return (
StatusCode::OK,
Json(RetryAllResponse {
items_retried: 0,
items_failed: 0,
message: "No items in DLQ".to_string(),
}),
);
}
let executor_guard = runtime.executor_ref().read().await;
let Some(executor) = executor_guard.as_ref() else {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(RetryAllResponse {
items_retried: 0,
items_failed: items.len(),
message: "Observer executor not available".to_string(),
}),
);
};
let mut retried = 0;
let mut failed = 0;
for item in &items {
match executor.process_event(&item.event).await {
Ok(_) => {
dlq.remove(item.id);
retried += 1;
},
Err(e) => {
tracing::warn!(item_id = %item.id, error = %e, "DLQ retry failed");
failed += 1;
},
}
}
(
StatusCode::OK,
Json(RetryAllResponse {
items_retried: retried,
items_failed: failed,
message: format!("Batch retry completed: {retried} retried, {failed} failed"),
}),
)
}
const fn action_type_str(action: &fraiseql_observers::ActionConfig) -> &'static str {
action.action_type()
}
#[cfg(test)]
mod tests;