1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// SPDX-License-Identifier: BUSL-1.1
//! Upsert handler: insert if absent, merge fields if present.
//!
//! Works for schemaless and strict collections. All internal transport
//! uses nodedb_types::Value + zerompk (msgpack). No JSON roundtrips.
use tracing::debug;
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::handlers::point::apply_put::PointPutParams;
use crate::data::executor::task::ExecutionTask;
use crate::engine::document::store::surrogate_to_doc_id;
use nodedb_types::Surrogate;
impl CoreLoop {
/// Upsert: insert if absent, merge fields if present.
///
/// If a document with `document_id` exists, merges `value` fields into the
/// existing document (preserving fields not in `value`). If it doesn't exist,
/// inserts as a new document (identical to PointPut).
///
/// `value` is msgpack-encoded (zerompk). Strict collections decode binary
/// tuples for existing docs, merge, and re-encode via `apply_point_put`.
#[allow(clippy::too_many_arguments)]
pub(in crate::data::executor) fn execute_upsert(
&mut self,
task: &ExecutionTask,
tid: u64,
collection: &str,
document_id: &str,
surrogate: Surrogate,
value: &[u8],
on_conflict_updates: &[(String, crate::bridge::physical_plan::UpdateValue)],
) -> Response {
let row_key = surrogate_to_doc_id(surrogate);
let row_key = row_key.as_str();
debug!(
core = self.core_id,
%collection,
%document_id,
has_on_conflict = !on_conflict_updates.is_empty(),
"upsert"
);
// Detect strict storage mode for this collection.
let config_key = (crate::types::TenantId::new(tid), collection.to_string());
let strict_schema = self.doc_configs.get(&config_key).and_then(|config| {
if let crate::bridge::physical_plan::StorageMode::Strict { ref schema } =
config.storage_mode
{
Some(schema.clone())
} else {
None
}
});
// Check if document already exists. Bitemporal collections consult
// the versioned table's current-state view (reverse-scan to newest
// non-tombstone); non-bitemporal collections use the legacy point
// lookup.
let bitemporal = self.is_bitemporal(tid, collection);
let existing = if bitemporal {
self.sparse.versioned_get_current(tid, collection, row_key)
} else {
self.sparse.get(tid, collection, row_key)
};
match existing {
Ok(Some(current_bytes)) => {
// Decode existing document to nodedb_types::Value.
let existing_val = if let Some(ref schema) = strict_schema {
// Strict: binary tuple → Value via schema.
match super::super::strict_format::binary_tuple_to_value(¤t_bytes, schema)
{
Some(v) => v,
None => {
// Fallback: try msgpack (migration case).
match nodedb_types::value_from_msgpack(¤t_bytes) {
Ok(v) => v,
Err(_) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: "failed to decode document for upsert".into(),
},
);
}
}
}
}
} else {
// Schemaless: stored as msgpack.
match nodedb_types::value_from_msgpack(¤t_bytes) {
Ok(v) => v,
Err(_) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: "failed to decode document for upsert".into(),
},
);
}
}
};
// Decode incoming value (msgpack → Value).
let new_val = match nodedb_types::value_from_msgpack(value) {
Ok(v) => v,
Err(_) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: "failed to decode upsert value from msgpack".into(),
},
);
}
};
// Conflict branch: if `ON CONFLICT DO UPDATE SET` assignments
// are present, evaluate each against the *existing* row and
// apply only those fields. Otherwise fall back to the plain
// merge semantics used by `UPSERT INTO` / no-action upserts.
let merged = if on_conflict_updates.is_empty() {
merge_values(existing_val, new_val)
} else {
apply_on_conflict_updates(existing_val, &new_val, on_conflict_updates)
};
let sys_from_ms = if bitemporal {
self.bitemporal_now_ms()
} else {
0
};
// Encode merged value for storage.
let stored_bytes = if let Some(ref schema) = strict_schema {
let result = if bitemporal && schema.bitemporal {
super::super::strict_format::value_to_binary_tuple_bitemporal(
&merged,
schema,
sys_from_ms,
i64::MIN,
i64::MAX,
)
} else {
super::super::strict_format::value_to_binary_tuple(&merged, schema)
};
match result {
Ok(bt) => bt,
Err(e) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: format!("binary tuple encode: {e}"),
},
);
}
}
} else {
// Schemaless: encode to msgpack.
match nodedb_types::value_to_msgpack(&merged) {
Ok(b) => b,
Err(_) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: "failed to encode merged upsert value".into(),
},
);
}
}
};
// Write directly to storage. `current_bytes` is the
// pre-merge stored row, already read above — thread it to
// the Event Plane as `old_value` so the emitted WriteOp
// resolves to Update. Bitemporal collections append a new
// version instead of overwriting.
let write_result = if bitemporal {
self.sparse
.versioned_put(crate::engine::sparse::btree_versioned::VersionedPut {
tenant: tid,
coll: collection,
doc_id: row_key,
sys_from_ms,
valid_from_ms: i64::MIN,
valid_until_ms: i64::MAX,
body: &stored_bytes,
})
.map(|()| None::<Vec<u8>>)
} else {
self.sparse.put(tid, collection, row_key, &stored_bytes)
};
match write_result {
Ok(_prior) => {
self.doc_cache.put(
task.request.database_id.as_u64(),
tid,
collection,
row_key,
&stored_bytes,
);
self.emit_put_event(
task,
tid,
collection,
row_key,
&stored_bytes,
Some(¤t_bytes),
);
self.response_ok(task)
}
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
),
}
}
Ok(None) => {
// Insert: document doesn't exist, create new (same as PointPut).
let txn = match self.sparse.begin_write() {
Ok(t) => t,
Err(e) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
);
}
};
// `apply_point_put` returns prior bytes if any; here the
// existence probe just above found none, and apply_point_put
// is the only writer on this core — prior must be None. We
// pass it straight through so the emit resolves to Insert.
let prior = match self.apply_point_put(
&txn,
PointPutParams {
database_id: task.request.database_id.as_u64(),
tid,
collection,
document_id: row_key,
surrogate,
value,
},
) {
Ok(p) => p,
Err(e) => {
return self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
);
}
};
if let Err(e) = txn.commit() {
return self.response_error(
task,
ErrorCode::Internal {
detail: format!("commit: {e}"),
},
);
}
self.emit_put_event(task, tid, collection, row_key, value, prior.as_deref());
self.response_ok(task)
}
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: e.to_string(),
},
),
}
}
}
/// Apply `ON CONFLICT DO UPDATE SET` assignments against the existing row.
///
/// Each assignment's RHS is evaluated via `SqlExpr::eval` — identical to
/// the UPDATE handler's path — so arithmetic (`n = n + 1`), functions
/// (`name = UPPER(name)`), `CASE`, and concatenation all work. Literal
/// assignments bypass the evaluator and decode their msgpack directly.
pub(in crate::data::executor) fn apply_on_conflict_updates(
existing: nodedb_types::Value,
excluded: &nodedb_types::Value,
updates: &[(String, crate::bridge::physical_plan::UpdateValue)],
) -> nodedb_types::Value {
let mut obj = match existing {
nodedb_types::Value::Object(map) => map,
// If the existing row isn't an object (shouldn't happen for
// document engines) fall back to the assignments as a blank slate.
_ => std::collections::HashMap::new(),
};
// Snapshot the row before any assignment applies, so all assignments
// see the pre-update state — matches PostgreSQL semantics. `excluded`
// is the row proposed for INSERT that triggered the conflict — it
// resolves `EXCLUDED.col` references inside the RHS expressions.
let snapshot = nodedb_types::Value::Object(obj.clone());
for (field, update_val) in updates {
let new_val: nodedb_types::Value = match update_val {
crate::bridge::physical_plan::UpdateValue::Literal(bytes) => {
match nodedb_types::value_from_msgpack(bytes) {
Ok(v) => v,
Err(_) => continue,
}
}
crate::bridge::physical_plan::UpdateValue::Expr(expr) => {
expr.eval_with_excluded(&snapshot, excluded)
}
};
obj.insert(field.clone(), new_val);
}
nodedb_types::Value::Object(obj)
}
/// Merge two `nodedb_types::Value` objects: overlay `new` fields onto `existing`.
fn merge_values(existing: nodedb_types::Value, new: nodedb_types::Value) -> nodedb_types::Value {
match (existing, new) {
(nodedb_types::Value::Object(mut existing_map), nodedb_types::Value::Object(new_map)) => {
for (k, v) in new_map {
existing_map.insert(k, v);
}
nodedb_types::Value::Object(existing_map)
}
// If shapes don't match, new value wins entirely.
(_, new) => new,
}
}