use tracing::debug;
use std::sync::Arc;
use crate::control::state::SharedState;
use crate::event::cdc::event::CdcEvent;
pub struct ConsumeParams<'a> {
pub tenant_id: u64,
pub stream_name: &'a str,
pub group_name: &'a str,
pub partition: Option<u32>,
pub limit: usize,
}
pub struct ConsumeResult {
pub events: Vec<Arc<CdcEvent>>,
pub partition_offsets: Vec<(u32, u64)>,
pub evicted_since_last_poll: u64,
pub oldest_available_lsn: u64,
}
pub fn consume_stream(
state: &SharedState,
params: &ConsumeParams<'_>,
) -> Result<ConsumeResult, ConsumeError> {
let stream_exists = state
.stream_registry
.get(params.tenant_id, params.stream_name)
.is_some();
let topic_exists = params
.stream_name
.strip_prefix("topic:")
.is_some_and(|bare| {
state
.ep_topic_registry
.get(params.tenant_id, bare)
.is_some()
});
if !stream_exists && !topic_exists {
return Err(ConsumeError::StreamNotFound(params.stream_name.to_string()));
}
let bare_stream = params
.stream_name
.strip_prefix("topic:")
.unwrap_or(params.stream_name);
let group_exists = state
.group_registry
.get(params.tenant_id, params.stream_name, params.group_name)
.is_some()
|| state
.group_registry
.get(params.tenant_id, bare_stream, params.group_name)
.is_some();
if !group_exists {
return Err(ConsumeError::GroupNotFound(
params.group_name.to_string(),
params.stream_name.to_string(),
));
}
if let Some(partition_id) = params.partition
&& let Some(remote_node) = remote_partition_leader(state, partition_id)
{
debug!(
partition = partition_id,
remote_node,
stream = params.stream_name,
"partition is remote — forwarding consume request"
);
return Err(ConsumeError::RemotePartition {
partition_id,
leader_node: remote_node,
});
}
consume_local(state, params)
}
pub fn consume_local(
state: &SharedState,
params: &ConsumeParams<'_>,
) -> Result<ConsumeResult, ConsumeError> {
let buffer = state
.cdc_router
.get_buffer(params.tenant_id, params.stream_name)
.ok_or_else(|| ConsumeError::BufferEmpty(params.stream_name.to_string()))?;
let events = if let Some(partition_id) = params.partition {
let from_lsn = state.offset_store.get_offset(
params.tenant_id,
params.stream_name,
params.group_name,
partition_id,
);
buffer.read_partition_from_lsn(partition_id, from_lsn, params.limit)
} else {
let all_offsets = state.offset_store.get_all_offsets(
params.tenant_id,
params.stream_name,
params.group_name,
);
let min_lsn = all_offsets
.iter()
.map(|o| o.committed_lsn)
.min()
.unwrap_or(0);
buffer.read_from_lsn(min_lsn, params.limit)
};
let mut partition_offsets: std::collections::BTreeMap<u32, u64> =
std::collections::BTreeMap::new();
for e in &events {
let entry = partition_offsets.entry(e.partition).or_insert(0);
if e.lsn > *entry {
*entry = e.lsn;
}
}
let total_evicted_now = buffer.total_evicted();
let evicted_since_last_poll = state.offset_store.swap_eviction_baseline(
params.tenant_id,
params.stream_name,
params.group_name,
total_evicted_now,
);
let oldest_available_lsn = buffer.earliest_lsn().unwrap_or(0);
Ok(ConsumeResult {
events,
partition_offsets: partition_offsets.into_iter().collect(),
evicted_since_last_poll,
oldest_available_lsn,
})
}
fn remote_partition_leader(state: &SharedState, partition_id: u32) -> Option<u64> {
let routing_lock = state.cluster_routing.as_ref()?;
let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
let leader = routing.leader_for_vshard(partition_id).ok()?;
if leader == state.node_id || leader == 0 {
None } else {
Some(leader)
}
}
pub fn build_consume_sql(params: &ConsumeParams<'_>) -> String {
if let Some(partition_id) = params.partition {
format!(
"SELECT * FROM STREAM {} PARTITION {} CONSUMER GROUP {} LIMIT {}",
params.stream_name, partition_id, params.group_name, params.limit
)
} else {
format!(
"SELECT * FROM STREAM {} CONSUMER GROUP {} LIMIT {}",
params.stream_name, params.group_name, params.limit
)
}
}
pub async fn consume_remote(
state: &SharedState,
params: &ConsumeParams<'_>,
_leader_node: u64,
) -> Result<ConsumeResult, ConsumeError> {
let gateway = state
.gateway
.as_ref()
.ok_or(ConsumeError::NoClusterTransport)?;
let sql = build_consume_sql(params);
let tenant_id = params.tenant_id;
let gw_ctx = crate::control::gateway::core::QueryContext {
tenant_id: crate::types::TenantId::new(tenant_id),
trace_id: nodedb_types::TraceId::generate(),
database_id: nodedb_types::id::DatabaseId::DEFAULT,
};
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let payloads = gateway
.execute_sql(&gw_ctx, &sql, &[], || {
let tasks = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(query_ctx.plan_sql(
&sql,
crate::types::TenantId::new(tenant_id),
crate::types::DatabaseId::DEFAULT,
))
})
.map_err(|e| crate::Error::PlanError {
detail: e.to_string(),
})?;
tasks
.into_iter()
.next()
.map(|t| t.plan)
.ok_or_else(|| crate::Error::PlanError {
detail: "stream SELECT produced no physical tasks".into(),
})
})
.await
.map_err(|e| ConsumeError::RemoteError(e.to_string()))?;
let events: Vec<Arc<CdcEvent>> = if let Some(payload) = payloads.first() {
zerompk::from_msgpack::<Vec<CdcEvent>>(payload)
.unwrap_or_default()
.into_iter()
.map(Arc::new)
.collect()
} else {
Vec::new()
};
let mut partition_offsets: std::collections::BTreeMap<u32, u64> =
std::collections::BTreeMap::new();
for e in &events {
let entry = partition_offsets.entry(e.partition).or_insert(0);
if e.lsn > *entry {
*entry = e.lsn;
}
}
Ok(ConsumeResult {
events,
partition_offsets: partition_offsets.into_iter().collect(),
evicted_since_last_poll: 0,
oldest_available_lsn: 0,
})
}
#[derive(Debug)]
pub enum ConsumeError {
StreamNotFound(String),
GroupNotFound(String, String),
BufferEmpty(String),
RemotePartition {
partition_id: u32,
leader_node: u64,
},
RemoteError(String),
NoClusterTransport,
}
impl std::fmt::Display for ConsumeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StreamNotFound(s) => write!(f, "change stream '{s}' does not exist"),
Self::GroupNotFound(g, s) => {
write!(f, "consumer group '{g}' does not exist on stream '{s}'")
}
Self::BufferEmpty(s) => write!(f, "stream '{s}' has no buffered events"),
Self::RemotePartition {
partition_id,
leader_node,
} => {
write!(
f,
"partition {partition_id} is on remote node {leader_node}"
)
}
Self::RemoteError(e) => write!(f, "remote consume error: {e}"),
Self::NoClusterTransport => write!(f, "gateway not available for remote stream read"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn consume_error_display() {
let e = ConsumeError::StreamNotFound("orders".into());
assert!(e.to_string().contains("orders"));
}
#[test]
fn remote_partition_error_display() {
let e = ConsumeError::RemotePartition {
partition_id: 5,
leader_node: 3,
};
assert!(e.to_string().contains("partition 5"));
assert!(e.to_string().contains("node 3"));
}
#[test]
fn build_consume_sql_with_partition() {
let params = ConsumeParams {
tenant_id: 1,
stream_name: "orders_stream",
group_name: "analytics",
partition: Some(5),
limit: 100,
};
let sql = build_consume_sql(¶ms);
assert_eq!(
sql,
"SELECT * FROM STREAM orders_stream PARTITION 5 CONSUMER GROUP analytics LIMIT 100"
);
}
#[test]
fn build_consume_sql_all_partitions() {
let params = ConsumeParams {
tenant_id: 1,
stream_name: "orders_stream",
group_name: "analytics",
partition: None,
limit: 50,
};
let sql = build_consume_sql(¶ms);
assert_eq!(
sql,
"SELECT * FROM STREAM orders_stream CONSUMER GROUP analytics LIMIT 50"
);
}
#[tokio::test]
async fn single_node_no_remote() {
let dir = tempfile::tempdir().unwrap();
let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
assert!(remote_partition_leader(&state, 5).is_none());
}
}