moltendb_core/handlers/process_set.rs
1use tracing::debug;
2use serde_json::{Value, json};
3use crate::validation;
4use crate::engine;
5use uuid::Uuid;
6
7
8fn resolve_extends(doc: Value, db: &engine::Db) -> Value {
9 // Only objects can have an `extends` block — pass everything else through unchanged.
10 let obj = match doc.as_object() {
11 Some(o) => o,
12 None => return doc,
13 };
14
15 // If there is no `extends` key, return the document unchanged.
16 // This is the fast path for the vast majority of inserts.
17 let extends_map = match obj.get("extends").and_then(|v| v.as_object()) {
18 Some(m) => m.clone(),
19 None => return doc,
20 };
21
22 // Clone the document into a mutable map and remove the `extends` key.
23 // The stored document must never contain `extends` — it is a directive,
24 // not a data field.
25 let mut result = obj.clone();
26 result.remove("extends");
27
28 // For each alias → "collection.key" reference, fetch the referenced document
29 // and embed it under the alias key.
30 for (alias, ref_val) in &extends_map {
31 if let Some(ref_str) = ref_val.as_str() {
32 // Split "collection.key" on the FIRST dot only.
33 // This allows keys that themselves contain dots (e.g. "memory.mem4.v2").
34 if let Some(dot_pos) = ref_str.find('.') {
35 let ref_collection = &ref_str[..dot_pos]; // e.g. "memory"
36 let ref_key = &ref_str[dot_pos + 1..]; // e.g. "mem4"
37
38 // O(1) hash-map lookup — no scanning, no joins at query time.
39 if let Some(referenced_doc) = db.get(ref_collection, vec![ref_key.to_string()]).remove(ref_key) {
40 // Embed the full referenced document under the alias key.
41 result.insert(alias.clone(), referenced_doc);
42 }
43 // If the reference is not found, the alias key is simply not added.
44 // We never fail the insert because of an unresolvable reference.
45 }
46 }
47 }
48
49 Value::Object(result)
50}
51
52/// Handle a SET (insert/upsert) request.
53///
54/// Accepts two data formats:
55/// - Object map: { "collection": "users", "data": { "u1": {...}, "u2": {...} } }
56/// Keys are provided by the client. Existing documents are overwritten.
57/// - Array: { "collection": "users", "data": [ {...}, {...} ] }
58/// Keys are auto-generated as UUIDv7 strings. Returns the generated IDs.
59pub fn process_set(db: &engine::Db, payload: &Value, max_body_size: usize, max_keys_per_request: usize) -> (u16, Value) {
60 // Only "collection" and "data" are valid for a set/insert request.
61 const SET_ALLOWED: &[&str] = &["collection", "data"];
62 if let Err(e) = validation::validate_allowed_properties(payload, SET_ALLOWED) {
63 return (400, json!({ "error": e.to_string(), "statusCode": 400 }));
64 }
65 if let Err(e) = validation::validate_request(payload, max_body_size, max_keys_per_request) {
66 return (400, json!({ "error": e.to_string(), "statusCode": 400 }));
67 }
68
69 let col = payload["collection"].as_str().unwrap_or("default");
70
71 match payload.get("data") {
72 // ── Object map format ─────────────────────────────────────────────────
73 // { "data": { "u1": { "name": "Alice" }, "u2": { "name": "Bob" } } }
74 Some(Value::Object(data_map)) => {
75 // Collect all key-value pairs into a Vec for batch insert.
76 let mut items = Vec::new();
77 for (k, v) in data_map {
78 let resolved = resolve_extends(v.clone(), db);
79 items.push((k.clone(), resolved));
80 }
81
82 match db.insert(col, items) {
83 Ok(_) => {
84 // Check collection size for auto-eviction (Hybrid Bitcask).
85 if let Ok(count) = db.evict_collection(col, db.hot_threshold) {
86 if count > 0 {
87 debug!("❄️ Auto-evicted {} documents from {} to disk", count, col);
88 }
89 }
90 (200, json!({ "status": "ok", "count": data_map.len() }))
91 },
92 Err(engine::DbError::Conflict) => (409, json!({ "error": "Conflict: Document version is outdated", "statusCode": 409 })),
93 #[cfg(feature = "schema")]
94 Err(engine::DbError::SchemaValidationError(msg)) => (400, json!({ "error": msg, "statusCode": 400 })),
95 Err(e) => (500, json!({ "error": "Database write failed", "details": e.to_string(), "statusCode": 500 }))
96 }
97 },
98
99 // ── Array format ──────────────────────────────────────────────────────
100 // { "data": [ { "name": "Alice" }, { "name": "Bob" } ] }
101 // Auto-generates UUIDv7 keys for each document.
102 Some(Value::Array(data_arr)) => {
103 let mut items = Vec::new();
104 let mut generated_ids = Vec::new();
105
106 for item in data_arr {
107 // UUIDv7 is time-ordered — documents inserted together will have
108 // adjacent keys, which is good for range scans.
109 let id = Uuid::now_v7().to_string();
110 generated_ids.push(id.clone());
111 // resolve_extends() handles the `extends` block for auto-keyed
112 // documents exactly the same as for named-key documents.
113 let resolved = resolve_extends(item.clone(), db);
114 items.push((id, resolved));
115 }
116
117 match db.insert(col, items) {
118 Ok(_) => {
119 // Check collection size for auto-eviction (Hybrid Bitcask).
120 if let Ok(count) = db.evict_collection(col, db.hot_threshold) {
121 if count > 0 {
122 debug!("❄️ Auto-evicted {} documents from {} to disk", count, col);
123 }
124 }
125 (200, json!({
126 "status": "ok",
127 "count": data_arr.len(),
128 "ids": generated_ids
129 }))
130 },
131 Err(engine::DbError::Conflict) => (409, json!({ "error": "Conflict: Document version is outdated", "statusCode": 409 })),
132 #[cfg(feature = "schema")]
133 Err(engine::DbError::SchemaValidationError(msg)) => (400, json!({ "error": msg, "statusCode": 400 })),
134 Err(e) => (500, json!({ "error": "Database write failed", "details": e.to_string(), "statusCode": 500 }))
135 }
136 },
137
138 // Missing or invalid data field.
139 _ => (400, json!({ "error": "Missing 'data' (object or array)", "statusCode": 400 }))
140 }
141}