use std::time::Duration;
use tracing::{info, warn};
use crate::control::state::SharedState;
use super::wire::{CompensationHint, DeltaPushMsg, DeltaRejectMsg, SyncFrame, SyncMessageType};
pub(super) async fn handle_shape_subscribe_async(
shared: &SharedState,
session: &super::session::SyncSession,
frame: &SyncFrame,
) -> Option<SyncFrame> {
use crate::bridge::envelope::PhysicalPlan;
use crate::control::server::pgwire::ddl::sync_dispatch::dispatch_async;
use crate::types::TenantId;
use nodedb_physical::physical_plan::DocumentOp;
let msg: super::shape::handler::ShapeSubscribeMsg = frame.decode_body()?;
let tenant_id = session.tenant_id.map(|t| t.as_u64()).unwrap_or(0);
let tid = TenantId::new(tenant_id);
if let Err(e) = shared.check_tenant_quota(tid) {
warn!(tenant_id, error = %e, "sync: shape subscribe rejected by quota");
return None;
}
let current_lsn = shared.wal.next_lsn().as_u64().saturating_sub(1);
shared.tenant_request_start(tid);
let snapshot_data = match &msg.shape.shape_type {
nodedb_types::sync::shape::ShapeType::Document {
collection,
predicate,
} => {
let plan = PhysicalPlan::Document(DocumentOp::RangeScan {
collection: collection.clone(),
field: String::new(), lower: None,
upper: None,
limit: 10_000, });
match dispatch_async(
shared,
TenantId::new(tenant_id),
collection,
plan,
Duration::from_secs(10),
)
.await
{
Ok(payload) => {
filter_snapshot_by_predicate(payload, predicate, &msg.shape.shape_id)
}
Err(e) => {
tracing::warn!(
shape_id = %msg.shape.shape_id,
error = %e,
"shape snapshot query failed, sending empty snapshot"
);
super::shape::handler::ShapeSnapshotData::empty()
}
}
}
nodedb_types::sync::shape::ShapeType::Vector { collection, .. } => {
super::shape::handler::ShapeSnapshotData {
data: collection.as_bytes().to_vec(),
doc_count: 0,
}
}
nodedb_types::sync::shape::ShapeType::Graph { .. } => {
super::shape::handler::ShapeSnapshotData::empty()
}
nodedb_types::sync::shape::ShapeType::Array {
array_name,
coord_range,
} => {
let array_known = shared.array_sync_schemas.schema_hlc(array_name).is_some();
if !array_known {
warn!(
session = %session.session_id,
array = %array_name,
"array shape subscribe: array not known to Origin schema registry"
);
shared.tenant_request_end(tid);
return super::shape::handler::handle_subscribe(
&session.session_id,
tenant_id,
&msg,
&super::shape::registry::ShapeRegistry::new(),
current_lsn,
|_, _| super::shape::handler::ShapeSnapshotData::empty(),
);
}
shared.array_subscriber_cursors.register(
&session.session_id,
array_name,
coord_range.clone(),
);
info!(
session = %session.session_id,
array = %array_name,
"array shape subscribed; cursor initialized at HLC::ZERO"
);
super::shape::handler::ShapeSnapshotData::empty()
}
_ => {
warn!(
session = %session.session_id,
"shape subscribe: unknown shape_type variant, sending empty snapshot"
);
super::shape::handler::ShapeSnapshotData::empty()
}
};
shared.tenant_request_end(tid);
let registry = super::shape::registry::ShapeRegistry::new();
let response = super::shape::handler::handle_subscribe(
&session.session_id,
tenant_id,
&msg,
®istry,
current_lsn,
|_shape, _lsn| snapshot_data,
);
info!(
session = %session.session_id,
shape_id = %msg.shape.shape_id,
lsn = current_lsn,
"shape subscribed with WAL LSN watermark"
);
response
}
pub(super) async fn validate_delta_constraints(
shared: &SharedState,
delta_msg: &DeltaPushMsg,
ack_frame: SyncFrame,
) -> Option<SyncFrame> {
use crate::bridge::envelope::PhysicalPlan;
use crate::control::server::pgwire::ddl::sync_dispatch::dispatch_async_with_source;
use crate::types::TenantId;
use nodedb_physical::physical_plan::CrdtOp;
let tenant_id = TenantId::new(0);
if let Err(e) = shared.check_tenant_quota(tenant_id) {
warn!(error = %e, "sync: delta validation rejected by quota");
let reject = DeltaRejectMsg {
mutation_id: delta_msg.mutation_id,
reason: e.to_string(),
compensation: Some(CompensationHint::Custom {
constraint: "quota".into(),
detail: e.to_string(),
}),
};
return SyncFrame::try_encode(SyncMessageType::DeltaReject, &reject);
}
let surrogate = match shared
.surrogate_assigner
.assign(&delta_msg.collection, delta_msg.document_id.as_bytes())
{
Ok(s) => s,
Err(e) => {
warn!(error = %e, "sync: surrogate assignment failed");
let reject = DeltaRejectMsg {
mutation_id: delta_msg.mutation_id,
reason: e.to_string(),
compensation: Some(CompensationHint::Custom {
constraint: "surrogate".into(),
detail: e.to_string(),
}),
};
return SyncFrame::try_encode(SyncMessageType::DeltaReject, &reject);
}
};
let plan = PhysicalPlan::Crdt(CrdtOp::Apply {
collection: delta_msg.collection.clone(),
document_id: delta_msg.document_id.clone(),
delta: delta_msg.delta.clone(),
peer_id: delta_msg.peer_id,
mutation_id: delta_msg.mutation_id,
surrogate,
});
shared.tenant_request_start(tenant_id);
let dispatch_result = dispatch_async_with_source(
shared,
tenant_id,
&delta_msg.collection,
plan,
Duration::from_secs(10),
crate::event::EventSource::CrdtSync,
)
.await;
shared.tenant_request_end(tenant_id);
match dispatch_result {
Ok(_payload) => {
Some(ack_frame)
}
Err(e) => {
let error_detail = e.to_string();
warn!(
collection = %delta_msg.collection,
doc = %delta_msg.document_id,
error = %error_detail,
"sync: delta constraint violation"
);
let hint = if error_detail.contains("unique") || error_detail.contains("UNIQUE") {
CompensationHint::UniqueViolation {
field: "unknown".into(),
conflicting_value: delta_msg.document_id.clone(),
}
} else if error_detail.contains("foreign") || error_detail.contains("FK") {
CompensationHint::ForeignKeyMissing {
referenced_id: delta_msg.document_id.clone(),
}
} else {
CompensationHint::Custom {
constraint: "constraint".into(),
detail: error_detail.clone(),
}
};
let reject = DeltaRejectMsg {
mutation_id: delta_msg.mutation_id,
reason: error_detail,
compensation: Some(hint),
};
SyncFrame::try_encode(SyncMessageType::DeltaReject, &reject)
}
}
}
fn filter_snapshot_by_predicate(
payload: Vec<u8>,
predicate_bytes: &[u8],
shape_id: &str,
) -> super::shape::handler::ShapeSnapshotData {
use crate::data::executor::response_codec::{
decode_raw_scan_to_docs, encode_raw_document_rows,
};
use nodedb_query::metadata_filter::matches_metadata_filter;
use nodedb_types::filter::MetadataFilter;
if predicate_bytes.is_empty() {
let doc_count = decode_raw_scan_to_docs(&payload).len();
return super::shape::handler::ShapeSnapshotData {
data: payload,
doc_count,
};
}
let filter = match zerompk::from_msgpack::<MetadataFilter>(predicate_bytes) {
Ok(f) => f,
Err(err) => {
warn!(
shape_id,
error = %err,
"shape snapshot: failed to decode predicate; sending empty snapshot"
);
return super::shape::handler::ShapeSnapshotData::empty();
}
};
let docs = decode_raw_scan_to_docs(&payload);
let mut matching: Vec<(String, Vec<u8>)> = Vec::new();
for (doc_id, data_bytes) in docs {
let doc_json = crate::control::server::sync::security::delta_bytes_to_json(&data_bytes);
if matches_metadata_filter(&doc_json, &filter) {
matching.push((doc_id, data_bytes));
}
}
let doc_count = matching.len();
match encode_raw_document_rows(&matching) {
Ok(data) => super::shape::handler::ShapeSnapshotData { data, doc_count },
Err(err) => {
warn!(
shape_id,
error = %err,
"shape snapshot: failed to encode filtered rows; sending empty snapshot"
);
super::shape::handler::ShapeSnapshotData::empty()
}
}
}