use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use crate::bridge::envelope::Payload;
use crate::bridge::envelope::{PhysicalPlan, Priority, Request, Response};
use crate::bridge::physical_plan::{DocumentOp, KvOp, TimeseriesOp};
use crate::control::state::SharedState;
use crate::types::{ReadConsistency, RequestId, TenantId, VShardId};
pub use super::broadcast::broadcast_to_all_cores;
pub use super::graph_dispatch::{cross_core_bfs, cross_core_bfs_with_options};
pub use super::wal_dispatch::wal_append_if_write;
static DISPATCH_COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
fn current_timestamp_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub async fn dispatch_to_data_plane(
shared: &SharedState,
tenant_id: TenantId,
vshard_id: VShardId,
plan: PhysicalPlan,
trace_id: u64,
) -> crate::Result<Response> {
let is_ts_collection = matches!(
&plan,
PhysicalPlan::Timeseries(TimeseriesOp::Ingest { .. })
| PhysicalPlan::Timeseries(TimeseriesOp::Scan { .. })
);
let change_meta = extract_write_metadata(&plan, tenant_id);
let request_id = RequestId::new(DISPATCH_COUNTER.fetch_add(1, Ordering::Relaxed));
let request = Request {
request_id,
tenant_id,
vshard_id,
plan,
deadline: Instant::now() + Duration::from_secs(shared.tuning.network.default_deadline_secs),
priority: Priority::Normal,
trace_id,
consistency: ReadConsistency::Strong,
idempotency_key: None,
};
let mut rx = shared.tracker.register(request_id);
match shared.dispatcher.lock() {
Ok(mut d) => d.dispatch(request)?,
Err(poisoned) => poisoned.into_inner().dispatch(request)?,
};
let response = tokio::time::timeout(
Duration::from_secs(shared.tuning.network.default_deadline_secs),
async {
let mut combined_payload: Vec<u8> = Vec::new();
let mut final_response: Option<Response> = None;
while let Some(resp) = rx.recv().await {
if resp.partial {
combined_payload.extend_from_slice(&resp.payload);
} else {
if combined_payload.is_empty() {
final_response = Some(resp);
} else {
combined_payload.extend_from_slice(&resp.payload);
final_response = Some(Response {
payload: Payload::from_vec(combined_payload),
..resp
});
}
break;
}
}
final_response.ok_or(())
},
)
.await
.map_err(|_| crate::Error::DeadlineExceeded { request_id })?
.map_err(|_| crate::Error::Dispatch {
detail: "response channel closed".into(),
})?;
if response.status == crate::bridge::envelope::Status::Ok
&& let Some((collection, doc_id, op)) = change_meta
{
let should_publish = if is_ts_collection {
is_timeseries_cdc_enabled(shared, tenant_id, &collection)
} else {
true
};
if should_publish {
use crate::control::change_stream::ChangeEvent;
shared.change_stream.publish(ChangeEvent {
lsn: response.watermark_lsn,
tenant_id,
collection,
document_id: doc_id,
operation: op,
timestamp_ms: current_timestamp_ms(),
after: None,
});
}
}
Ok(response)
}
fn extract_write_metadata(
plan: &PhysicalPlan,
_tenant_id: TenantId,
) -> Option<(
String,
String,
crate::control::change_stream::ChangeOperation,
)> {
use crate::control::change_stream::ChangeOperation;
match plan {
PhysicalPlan::Document(DocumentOp::PointPut {
collection,
document_id,
..
}) => Some((
collection.clone(),
document_id.clone(),
ChangeOperation::Insert,
)),
PhysicalPlan::Document(DocumentOp::PointDelete {
collection,
document_id,
}) => Some((
collection.clone(),
document_id.clone(),
ChangeOperation::Delete,
)),
PhysicalPlan::Document(DocumentOp::PointUpdate {
collection,
document_id,
..
}) => Some((
collection.clone(),
document_id.clone(),
ChangeOperation::Update,
)),
PhysicalPlan::Document(DocumentOp::Upsert {
collection,
document_id,
..
}) => Some((
collection.clone(),
document_id.clone(),
ChangeOperation::Insert,
)),
PhysicalPlan::Document(DocumentOp::BulkUpdate { collection, .. }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Update))
}
PhysicalPlan::Document(DocumentOp::BulkDelete { collection, .. }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Delete))
}
PhysicalPlan::Document(DocumentOp::Truncate { collection }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Delete))
}
PhysicalPlan::Timeseries(TimeseriesOp::Ingest { collection, .. }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Insert))
}
PhysicalPlan::Kv(KvOp::Put {
collection, key, ..
}) => Some((
collection.clone(),
String::from_utf8_lossy(key).into_owned(),
ChangeOperation::Insert,
)),
PhysicalPlan::Kv(KvOp::Delete { collection, .. }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Delete))
}
PhysicalPlan::Kv(KvOp::FieldSet {
collection, key, ..
}) => Some((
collection.clone(),
String::from_utf8_lossy(key).into_owned(),
ChangeOperation::Update,
)),
PhysicalPlan::Kv(KvOp::BatchPut { collection, .. }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Insert))
}
PhysicalPlan::Kv(KvOp::Truncate { collection }) => {
Some((collection.clone(), "*".into(), ChangeOperation::Delete))
}
_ => None,
}
}
fn is_timeseries_cdc_enabled(shared: &SharedState, tenant_id: TenantId, collection: &str) -> bool {
if let Some(catalog) = shared.credentials.catalog()
&& let Ok(Some(coll)) = catalog.get_collection(tenant_id.as_u32(), collection)
&& coll.collection_type.is_timeseries()
{
if let Some(config) = coll.get_timeseries_config()
&& let Some(cdc_val) = config.get("cdc")
{
return cdc_val.as_str() == Some("true") || cdc_val.as_bool() == Some(true);
}
return false;
}
true
}