1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// SPDX-License-Identifier: BUSL-1.1
//! Spatial sync ingest handlers: insert/delete geometries into the R-tree
//! on behalf of a Lite sync client.
//!
//! Called by `dispatch_spatial` when the plan variant is
//! `SpatialOp::Insert` or `SpatialOp::Delete`.
//!
//! ## Surrogate-keyed storage
//!
//! Both the R-tree entry and the sparse document body are keyed by the
//! hex-encoded surrogate (assigned on the Control Plane before dispatch).
//! This matches the direct `INSERT INTO ... VALUES (ST_GeomFromText(...))`
//! path so cross-engine prefilter (roaring-bitmap intersect against the
//! surrogate space) just works — `spatial_doc_map` stores the same
//! 8-char hex string that `surrogate_to_doc_id(surrogate)` produces, which
//! is what the scan path parses via `u32::from_str_radix(doc_id, 16)`.
//!
//! ## Document-store parity
//!
//! Origin spatial scans read document bytes from the sparse store to apply
//! exact OGC predicates. When syncing from Lite, we therefore also write a
//! minimal geometry document (`{field: <geometry>, "id": surrogate_hex}`)
//! to the sparse engine in addition to the R-tree entry. Any failure to
//! materialise that body is a hard error (no silent partial-success); the
//! whole op fails and the Control Plane converts it to a rejection ACK.
use tracing::{debug, error};
use crate::bridge::envelope::Response;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use crate::engine::document::store::surrogate_to_doc_id;
use crate::engine::spatial::RTreeEntry;
use crate::types::TenantId;
use crate::util::fnv1a_hash;
use nodedb_types::Surrogate;
use nodedb_types::bbox::geometry_bbox;
use nodedb_types::geometry::Geometry;
impl CoreLoop {
/// Insert a geometry into the per-field R-tree and the sparse document store
/// on behalf of a Lite client.
///
/// Both writes use the hex-encoded surrogate as the storage key so
/// cross-engine surrogate prefilters resolve without translation.
///
/// Fails fast (returns a Response::Error) if either:
/// - the geometry document cannot be msgpack-serialised, or
/// - the sparse document write fails.
///
/// If an entry with the same surrogate already exists, the R-tree entry is
/// replaced (upsert semantics).
pub(in crate::data::executor) fn execute_spatial_insert(
&mut self,
task: &ExecutionTask,
tid: u64,
collection: &str,
field: &str,
surrogate: Surrogate,
geometry: &Geometry,
) -> Response {
let doc_id = surrogate_to_doc_id(surrogate);
debug!(
core = self.core_id,
%collection,
%field,
doc_id = %doc_id,
surrogate = surrogate.as_u32(),
"spatial sync: insert geometry"
);
// ── 1. Serialise + write minimal geometry document to sparse store ──
let geom_value = geometry_to_value(geometry);
let mut doc_map = std::collections::HashMap::new();
doc_map.insert(field.to_string(), geom_value);
doc_map.insert(
"id".to_string(),
nodedb_types::Value::String(doc_id.clone()),
);
let doc_value = nodedb_types::Value::Object(doc_map);
// Use standard msgpack map format so scan_collection / decode_document_value
// can read it back. zerompk::to_msgpack_vec encodes Value in a tagged array
// format the document scan path does not understand.
let msgpack = match nodedb_types::value_to_msgpack(&doc_value) {
Ok(b) => b,
Err(e) => {
error!(
core = self.core_id,
%collection,
%field,
doc_id = %doc_id,
error = %e,
"spatial sync: geometry document msgpack serialisation failed"
);
return self.response_error(
task,
crate::Error::Internal {
detail: format!("spatial sync: serialise geometry document: {e}"),
},
);
}
};
if let Err(e) = self.sparse.put(tid, collection, &doc_id, &msgpack) {
error!(
core = self.core_id,
%collection,
doc_id = %doc_id,
error = %e,
"spatial sync: sparse document write failed"
);
return self.response_error(task, e);
}
// ── 2. Update R-tree ────────────────────────────────────────────────
let tenant_id = TenantId::new(tid);
let spatial_key = (tenant_id, collection.to_string(), field.to_string());
let entry_id = fnv1a_hash(doc_id.as_bytes());
let doc_map_key = (
tenant_id,
collection.to_string(),
field.to_string(),
entry_id,
);
// If the surrogate was previously inserted, remove the stale entry
// first so the R-tree doesn't accumulate duplicate bboxes.
if let Some(rtree) = self.spatial_indexes.get_mut(&spatial_key)
&& self.spatial_doc_map.contains_key(&doc_map_key)
{
rtree.delete(entry_id);
}
let bbox = geometry_bbox(geometry);
let rtree = self.spatial_indexes.entry(spatial_key).or_default();
rtree.insert(RTreeEntry { id: entry_id, bbox });
self.spatial_doc_map.insert(doc_map_key, doc_id);
self.response_ok(task)
}
/// Remove a document's geometry from the per-field R-tree and the sparse
/// document store on behalf of a Lite client.
///
/// Keyed by the same hex-encoded surrogate used at insert time. A delete
/// of a non-existent surrogate is a no-op (idempotent).
///
/// Fails fast if the sparse store delete returns an error (other than
/// "not found", which the sparse engine surfaces as `Ok`).
pub(in crate::data::executor) fn execute_spatial_delete(
&mut self,
task: &ExecutionTask,
tid: u64,
collection: &str,
field: &str,
surrogate: Surrogate,
) -> Response {
let doc_id = surrogate_to_doc_id(surrogate);
debug!(
core = self.core_id,
%collection,
%field,
doc_id = %doc_id,
surrogate = surrogate.as_u32(),
"spatial sync: delete geometry"
);
if let Err(e) = self.sparse.delete(tid, collection, &doc_id) {
error!(
core = self.core_id,
%collection,
doc_id = %doc_id,
error = %e,
"spatial sync: sparse document delete failed"
);
return self.response_error(task, e);
}
let tenant_id = TenantId::new(tid);
let spatial_key = (tenant_id, collection.to_string(), field.to_string());
let entry_id = fnv1a_hash(doc_id.as_bytes());
let doc_map_key = (
tenant_id,
collection.to_string(),
field.to_string(),
entry_id,
);
if let Some(rtree) = self.spatial_indexes.get_mut(&spatial_key) {
rtree.delete(entry_id);
}
self.spatial_doc_map.remove(&doc_map_key);
self.response_ok(task)
}
}
/// Convert a `Geometry` into a `nodedb_types::Value` for msgpack serialisation.
///
/// Produces a JSON-compatible nested structure matching what a pgwire INSERT
/// with a GeoJSON geometry value would produce.
fn geometry_to_value(geometry: &Geometry) -> nodedb_types::Value {
use nodedb_types::Value;
use nodedb_types::geometry::Geometry::*;
match geometry {
Point { coordinates } => {
let mut m = std::collections::HashMap::new();
m.insert("type".to_string(), Value::String("Point".to_string()));
m.insert(
"coordinates".to_string(),
Value::Array(vec![
Value::Float(coordinates[0]),
Value::Float(coordinates[1]),
]),
);
Value::Object(m)
}
LineString { coordinates } => {
let mut m = std::collections::HashMap::new();
m.insert("type".to_string(), Value::String("LineString".to_string()));
m.insert(
"coordinates".to_string(),
Value::Array(
coordinates
.iter()
.map(|c| Value::Array(vec![Value::Float(c[0]), Value::Float(c[1])]))
.collect(),
),
);
Value::Object(m)
}
Polygon { coordinates } => {
let mut m = std::collections::HashMap::new();
m.insert("type".to_string(), Value::String("Polygon".to_string()));
m.insert(
"coordinates".to_string(),
Value::Array(
coordinates
.iter()
.map(|ring| {
Value::Array(
ring.iter()
.map(|c| {
Value::Array(vec![Value::Float(c[0]), Value::Float(c[1])])
})
.collect(),
)
})
.collect(),
),
);
Value::Object(m)
}
// For other geometry types, fall back to JSON serialisation then
// parse as Value. These are less common in sync workloads.
other => match sonic_rs::to_string(other) {
Ok(json) => match sonic_rs::from_str::<serde_json::Value>(&json) {
Ok(v) => nodedb_types::conversion::json_to_value(v),
Err(_) => Value::Null,
},
Err(_) => Value::Null,
},
}
}