use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};
use super::super::auth::{AppState, ResolvedIdentity};
use crate::event::cdc::consume::{ConsumeError, ConsumeParams, ConsumeResult, consume_stream};
#[derive(Deserialize, Default)]
pub struct PollParams {
pub group: Option<String>,
pub limit: Option<usize>,
pub partition: Option<u32>,
pub tenant_id: Option<u64>,
}
#[derive(Serialize)]
pub struct PollResponse {
pub events: Vec<serde_json::Value>,
pub partition_offsets: std::collections::BTreeMap<String, u64>,
pub count: usize,
pub evicted_since_last_poll: u64,
pub oldest_available_lsn: u64,
}
pub async fn poll_stream(
identity: ResolvedIdentity,
Path(stream_name): Path<String>,
Query(params): Query<PollParams>,
State(state): State<AppState>,
) -> impl IntoResponse {
if params.tenant_id.is_some() {
return (
StatusCode::FORBIDDEN,
Json(serde_json::json!({
"error": "tenant_id must not be supplied as a query parameter; \
tenant is determined from the bearer token"
})),
)
.into_response();
}
let group = match params.group {
Some(g) => g.to_lowercase(),
None => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "missing 'group' query parameter"})),
)
.into_response();
}
};
let tenant_id = identity.tenant_id().as_u64();
let limit = params.limit.unwrap_or(100).min(10_000);
let stream_name = stream_name.to_lowercase();
let consume_params = ConsumeParams {
tenant_id,
stream_name: &stream_name,
group_name: &group,
partition: params.partition,
limit,
};
let result = match consume_stream(&state.shared, &consume_params) {
Ok(r) => r,
Err(ConsumeError::RemotePartition { leader_node, .. }) => {
match crate::event::cdc::consume::consume_remote(
&state.shared,
&consume_params,
leader_node,
)
.await
{
Ok(r) => r,
Err(e) => {
return (
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
}
}
Err(ConsumeError::BufferEmpty(_)) => ConsumeResult {
events: Vec::new(),
partition_offsets: Vec::new(),
evicted_since_last_poll: 0,
oldest_available_lsn: 0,
},
Err(e) => {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
};
let events: Vec<serde_json::Value> = result
.events
.iter()
.map(|e| serde_json::to_value(e).unwrap_or_default())
.collect();
let count = events.len();
let partition_offsets: std::collections::BTreeMap<String, u64> = result
.partition_offsets
.into_iter()
.map(|(pid, lsn)| (pid.to_string(), lsn))
.collect();
Json(PollResponse {
events,
partition_offsets,
count,
evicted_since_last_poll: result.evicted_since_last_poll,
oldest_available_lsn: result.oldest_available_lsn,
})
.into_response()
}