use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use sonic_rs;
use tracing::debug;
use crate::control::state::SharedState;
use crate::event::cdc::buffer::StreamBuffer;
use crate::event::cdc::event::CdcEvent;
use crate::event::cdc::stream_def::RetentionConfig;
pub fn publish_to_topic(
state: &SharedState,
tenant_id: u64,
topic_name: &str,
payload: &str,
) -> Result<u64, PublishError> {
let topic = state
.ep_topic_registry
.get(tenant_id, topic_name)
.ok_or_else(|| PublishError::TopicNotFound(topic_name.to_string()))?;
if let Some(leader) = topic_home_node(state, topic_name)
&& leader != state.node_id
{
debug!(
topic = topic_name,
home_node = leader,
"topic home is remote — forwarding publish"
);
return Err(PublishError::RemoteHome {
topic_name: topic_name.to_string(),
leader_node: leader,
});
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let value: serde_json::Value =
sonic_rs::from_str(payload).unwrap_or_else(|_| serde_json::json!({"message": payload}));
let buffer = get_or_create_topic_buffer(state, tenant_id, topic_name, &topic.retention);
let sequence = buffer.total_pushed() + 1;
let event = CdcEvent {
sequence,
partition: 0, collection: format!("topic:{topic_name}"),
op: "PUBLISH".into(),
row_id: format!("msg-{sequence}"),
event_time: now_ms,
lsn: now_ms, tenant_id,
new_value: Some(value),
old_value: None,
schema_version: 0,
field_diffs: None,
system_time_ms: None,
valid_time_ms: None,
};
buffer.push(event);
Ok(sequence)
}
fn get_or_create_topic_buffer(
state: &SharedState,
tenant_id: u64,
topic_name: &str,
retention: &RetentionConfig,
) -> Arc<StreamBuffer> {
let buffer_key = format!("topic:{topic_name}");
if let Some(buf) = state.cdc_router.get_buffer(tenant_id, &buffer_key) {
return buf;
}
state
.cdc_router
.ensure_buffer(tenant_id, &buffer_key, retention)
}
fn topic_home_node(state: &SharedState, topic_name: &str) -> Option<u64> {
let routing_lock = state.cluster_routing.as_ref()?;
let vshard_id = nodedb_cluster::routing::vshard_for_collection(
nodedb_types::id::DatabaseId::DEFAULT,
topic_name,
);
let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
routing.leader_for_vshard(vshard_id).ok()
}
pub async fn publish_remote(
state: &SharedState,
tenant_id: u64,
topic_name: &str,
payload: &str,
_leader_node: u64,
) -> Result<u64, PublishError> {
let gateway = state
.gateway
.as_ref()
.ok_or_else(|| PublishError::RemoteError("gateway not available".into()))?;
let sql = format!(
"PUBLISH TO {} '{}'",
topic_name,
payload.replace('\'', "''") );
let gw_ctx = crate::control::gateway::core::QueryContext {
tenant_id: crate::types::TenantId::new(tenant_id),
trace_id: nodedb_types::TraceId::generate(),
database_id: nodedb_types::id::DatabaseId::DEFAULT,
};
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
gateway
.execute_sql(&gw_ctx, &sql, &[], || {
let tasks = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(query_ctx.plan_sql(
&sql,
crate::types::TenantId::new(tenant_id),
crate::types::DatabaseId::DEFAULT,
))
})
.map_err(|e| crate::Error::PlanError {
detail: e.to_string(),
})?;
tasks
.into_iter()
.next()
.map(|t| t.plan)
.ok_or_else(|| crate::Error::PlanError {
detail: "PUBLISH produced no physical tasks".into(),
})
})
.await
.map_err(|e| PublishError::RemoteError(e.to_string()))?;
Ok(0) }
#[derive(Debug)]
pub enum PublishError {
TopicNotFound(String),
RemoteHome {
topic_name: String,
leader_node: u64,
},
RemoteError(String),
}
impl std::fmt::Display for PublishError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TopicNotFound(t) => write!(f, "topic '{t}' does not exist"),
Self::RemoteHome {
topic_name,
leader_node,
} => {
write!(f, "topic '{topic_name}' home is on node {leader_node}")
}
Self::RemoteError(e) => write!(f, "remote publish error: {e}"),
}
}
}