1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// SPDX-License-Identifier: BUSL-1.1
//! PointDelete: remove one document plus its cascading side-effects across
//! inverted, secondary, graph, and spatial indexes.
use tracing::{debug, warn};
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::doc_format;
use crate::data::executor::handlers::returning_rows;
use crate::data::executor::task::ExecutionTask;
use crate::engine::document::store::surrogate_to_doc_id;
use nodedb_physical::physical_plan::ReturningSpec;
use nodedb_types::Surrogate;
impl CoreLoop {
pub(in crate::data::executor) fn execute_point_delete(
&mut self,
task: &ExecutionTask,
tid: u64,
collection: &str,
document_id: &str,
surrogate: Surrogate,
returning: Option<&ReturningSpec>,
) -> Response {
let row_key = surrogate_to_doc_id(surrogate);
let row_key = row_key.as_str();
debug!(core = self.core_id, %collection, %document_id, "point delete");
// On bitemporal collections: append a doc tombstone + versioned
// index tombstones for every current field value. `prior` is the
// pre-delete body so the Event Plane sees `old_value` correctly.
// Current-state-only indexes (text, graph, spatial, vector) are
// still cascaded below — they track "what exists now" regardless
// of bitemporal history.
let bitemporal = self.is_bitemporal(tid, collection);
let delete_result: crate::Result<Option<Vec<u8>>> = if bitemporal {
let prior = self.sparse.versioned_get_current(tid, collection, row_key);
match prior {
Ok(Some(body)) => {
let sys_from = self.bitemporal_now_ms();
let res: crate::Result<()> = (|| {
let txn = self.sparse.begin_write()?;
self.sparse
.versioned_tombstone_in_txn(&txn, tid, collection, row_key, sys_from)?;
// Index tombstones: reflect every current value so
// `index_lookup_as_of` at or after `sys_from` skips
// this doc_id.
let config_key = (crate::types::TenantId::new(tid), collection.to_string());
if let Some(config) = self.doc_configs.get(&config_key)
&& let Some(doc) = doc_format::decode_document(&body)
{
for path in config.index_paths.clone() {
for v in crate::engine::document::store::extract_index_values(
&doc,
&path.path,
path.is_array,
) {
let value = if path.case_insensitive {
v.to_lowercase()
} else {
v
};
self.sparse.versioned_index_tombstone_in_txn(
&txn, tid, collection, &path.path, &value, row_key,
sys_from,
)?;
}
}
}
txn.commit().map_err(|e| crate::Error::Storage {
engine: "sparse".into(),
detail: format!("commit: {e}"),
})?;
Ok(())
})();
res.map(|()| Some(body))
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
} else {
self.sparse.delete(tid, collection, row_key)
};
match delete_result {
Ok(prior) => {
// Cascade 1: Remove from full-text inverted index. The
// inverted index was populated by `apply_point_put` with
// the substrate row key (hex surrogate), not the
// user-visible PK — keep the cascade keyed the same way
// so a delete actually wipes the term postings.
if let Err(e) = self.inverted.remove_document(
crate::types::TenantId::new(tid),
collection,
surrogate,
) {
warn!(core = self.core_id, %collection, %document_id, error = %e, "inverted index removal failed");
}
// Cascade 2: Remove secondary index entries for this document.
// Secondary indexes use key format "{tenant}:{collection}:{field}:{value}:{doc_id}".
// We scan and delete all entries ending with this doc_id.
if let Err(e) = self
.sparse
.delete_indexes_for_document(tid, collection, row_key)
{
warn!(core = self.core_id, %collection, %document_id, error = %e, "secondary index cascade failed");
}
// Cascade 3: Remove graph edges where this document is src or dst.
let edges_removed = self.csr_partition_mut(tid).remove_node_edges(document_id);
if edges_removed > 0 {
// Also tombstone in persistent edge store.
let cascade_ord = self.hlc.next_ordinal();
if let Err(e) = self.edge_store.delete_edges_for_node(
nodedb_types::TenantId::new(tid),
document_id,
cascade_ord,
) {
warn!(core = self.core_id, %document_id, error = %e, "edge cascade failed");
}
tracing::trace!(core = self.core_id, %document_id, edges_removed, "EDGE_CASCADE_DELETE");
}
// Cascade 4: Remove from spatial R-tree indexes + reverse map.
// `apply_point_put` hashes the substrate row key as the
// R-tree entry id, so delete must hash the same key to
// find the entry. Hashing the user PK would leak ghost
// bbox entries that survive the row's removal.
let entry_id = crate::util::fnv1a_hash(row_key.as_bytes());
let tid_id = crate::types::TenantId::new(tid);
let spatial_fields: Vec<String> = self
.spatial_indexes
.keys()
.filter(|(t, c, _)| *t == tid_id && c == collection)
.map(|(_, _, f)| f.clone())
.collect();
for field in spatial_fields {
let skey = (tid_id, collection.to_string(), field.clone());
if let Some(rtree) = self.spatial_indexes.get_mut(&skey) {
rtree.delete(entry_id);
}
self.spatial_doc_map
.remove(&(tid_id, collection.to_string(), field, entry_id));
}
// Record deletion for edge referential integrity.
self.mark_node_deleted(tid, document_id);
// Invalidate document cache.
self.doc_cache.invalidate(
task.request.database_id.as_u64(),
tid,
collection,
row_key,
);
self.checkpoint_coordinator.mark_dirty("sparse", 1);
// Emit delete event to Event Plane if the row actually
// existed. `sparse.delete` returns the prior bytes — we
// thread them through so CDC/trigger consumers see the
// pre-delete state as `old_value`. A delete against a
// non-existent key is a true no-op and emits nothing.
if let Some(prior_bytes) = prior.as_deref() {
let old_converted = self.resolve_event_payload(tid, collection, prior_bytes);
self.emit_write_event(
task,
collection,
crate::event::WriteOp::Delete,
document_id,
None,
Some(old_converted.as_deref().unwrap_or(prior_bytes)),
);
}
if let (Some(spec), Some(prior_bytes)) = (returning, prior.as_deref()) {
// Decode the pre-deletion document and project per spec.
let prior_with_id = nodedb_query::msgpack_scan::inject_str_field(
prior_bytes,
"id",
document_id,
);
let doc = match doc_format::decode_document(&prior_with_id) {
Some(v) => v,
None => serde_json::json!({"id": document_id}),
};
match returning_rows::build_rows_payload(spec, &[doc]) {
Ok(payload) => self.response_with_payload(task, payload),
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: format!("RETURNING encode: {e}"),
},
),
}
} else if let Some(spec) = returning {
// Row did not exist — return empty rows payload.
match returning_rows::build_rows_payload(spec, &[]) {
Ok(payload) => self.response_with_payload(task, payload),
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: format!("RETURNING encode: {e}"),
},
),
}
} else {
self.response_ok(task)
}
}
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
),
}
}
}