use std::time::Duration;
use tracing::{info, warn};
use crate::control::state::SharedState;
use super::wire::{CompensationHint, DeltaPushMsg, DeltaRejectMsg, SyncFrame, SyncMessageType};
pub(super) async fn handle_shape_subscribe_async(
shared: &SharedState,
session: &super::session::SyncSession,
frame: &SyncFrame,
) -> Option<SyncFrame> {
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::DocumentOp;
use crate::control::server::pgwire::ddl::sync_dispatch::dispatch_async;
use crate::types::TenantId;
let msg: super::shape::handler::ShapeSubscribeMsg = frame.decode_body()?;
let tenant_id = session.tenant_id.map(|t| t.as_u32()).unwrap_or(0);
let current_lsn = shared.wal.next_lsn().as_u64().saturating_sub(1);
let snapshot_data = match &msg.shape.shape_type {
nodedb_types::sync::shape::ShapeType::Document { collection, .. } => {
let plan = PhysicalPlan::Document(DocumentOp::RangeScan {
collection: collection.clone(),
field: String::new(), lower: None,
upper: None,
limit: 10_000, });
match dispatch_async(
shared,
TenantId::new(tenant_id),
collection,
plan,
Duration::from_secs(10),
)
.await
{
Ok(payload) => super::shape::handler::ShapeSnapshotData {
data: payload,
doc_count: 1, },
Err(e) => {
tracing::warn!(
shape_id = %msg.shape.shape_id,
error = %e,
"shape snapshot query failed, sending empty snapshot"
);
super::shape::handler::ShapeSnapshotData::empty()
}
}
}
nodedb_types::sync::shape::ShapeType::Vector { collection, .. } => {
super::shape::handler::ShapeSnapshotData {
data: collection.as_bytes().to_vec(),
doc_count: 0,
}
}
nodedb_types::sync::shape::ShapeType::Graph { .. } => {
super::shape::handler::ShapeSnapshotData::empty()
}
};
let registry = super::shape::registry::ShapeRegistry::new();
let response = super::shape::handler::handle_subscribe(
&session.session_id,
tenant_id,
&msg,
®istry,
current_lsn,
|_shape, _lsn| snapshot_data,
);
info!(
session = %session.session_id,
shape_id = %msg.shape.shape_id,
lsn = current_lsn,
"shape subscribed with WAL LSN watermark"
);
Some(response)
}
pub(super) async fn validate_delta_constraints(
shared: &SharedState,
delta_msg: &DeltaPushMsg,
ack_frame: SyncFrame,
) -> SyncFrame {
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::CrdtOp;
use crate::control::server::pgwire::ddl::sync_dispatch::dispatch_async;
use crate::types::TenantId;
let tenant_id = TenantId::new(0); let plan = PhysicalPlan::Crdt(CrdtOp::Apply {
collection: delta_msg.collection.clone(),
document_id: delta_msg.document_id.clone(),
delta: delta_msg.delta.clone(),
peer_id: delta_msg.peer_id,
mutation_id: delta_msg.mutation_id,
});
match dispatch_async(
shared,
tenant_id,
&delta_msg.collection,
plan,
Duration::from_secs(10),
)
.await
{
Ok(_payload) => {
ack_frame
}
Err(e) => {
let error_detail = e.to_string();
warn!(
collection = %delta_msg.collection,
doc = %delta_msg.document_id,
error = %error_detail,
"sync: delta constraint violation"
);
let hint = if error_detail.contains("unique") || error_detail.contains("UNIQUE") {
CompensationHint::UniqueViolation {
field: "unknown".into(),
conflicting_value: delta_msg.document_id.clone(),
}
} else if error_detail.contains("foreign") || error_detail.contains("FK") {
CompensationHint::ForeignKeyMissing {
referenced_id: delta_msg.document_id.clone(),
}
} else {
CompensationHint::Custom {
constraint: "constraint".into(),
detail: error_detail.clone(),
}
};
let reject = DeltaRejectMsg {
mutation_id: delta_msg.mutation_id,
reason: error_detail,
compensation: Some(hint),
};
SyncFrame::encode_or_empty(SyncMessageType::DeltaReject, &reject)
}
}
}