Skip to main content

moltendb_core/handlers/
process_set.rs

1use tracing::debug;
2use serde_json::{Value, json};
3use crate::validation;
4use crate::engine;
5use uuid::Uuid;
6
7
8fn resolve_extends(doc: Value, db: &engine::Db) -> Value {
9    // Only objects can have an `extends` block — pass everything else through unchanged.
10    let obj = match doc.as_object() {
11        Some(o) => o,
12        None => return doc,
13    };
14
15    // If there is no `extends` key, return the document unchanged.
16    // This is the fast path for the vast majority of inserts.
17    let extends_map = match obj.get("extends").and_then(|v| v.as_object()) {
18        Some(m) => m.clone(),
19        None => return doc,
20    };
21
22    // Clone the document into a mutable map and remove the `extends` key.
23    // The stored document must never contain `extends` — it is a directive,
24    // not a data field.
25    let mut result = obj.clone();
26    result.remove("extends");
27
28    // For each alias → "collection.key" reference, fetch the referenced document
29    // and embed it under the alias key.
30    for (alias, ref_val) in &extends_map {
31        if let Some(ref_str) = ref_val.as_str() {
32            // Split "collection.key" on the FIRST dot only.
33            // This allows keys that themselves contain dots (e.g. "memory.mem4.v2").
34            if let Some(dot_pos) = ref_str.find('.') {
35                let ref_collection = &ref_str[..dot_pos];  // e.g. "memory"
36                let ref_key        = &ref_str[dot_pos + 1..]; // e.g. "mem4"
37
38                // O(1) hash-map lookup — no scanning, no joins at query time.
39                if let Some(referenced_doc) = db.get(ref_collection, vec![ref_key.to_string()]).remove(ref_key) {
40                    // Embed the full referenced document under the alias key.
41                    result.insert(alias.clone(), referenced_doc);
42                }
43                // If the reference is not found, the alias key is simply not added.
44                // We never fail the insert because of an unresolvable reference.
45            }
46        }
47    }
48
49    Value::Object(result)
50}
51
52/// Handle a SET (insert/upsert) request.
53///
54/// Accepts two data formats:
55///   - Object map: { "collection": "users", "data": { "u1": {...}, "u2": {...} } }
56///     Keys are provided by the client. Existing documents are overwritten.
57///   - Array:      { "collection": "users", "data": [ {...}, {...} ] }
58///     Keys are auto-generated as UUIDv7 strings. Returns the generated IDs.
59pub fn process_set(db: &engine::Db, payload: &Value, max_body_size: usize, max_keys_per_request: usize) -> (u16, Value) {
60    // Only "collection" and "data" are valid for a set/insert request.
61    const SET_ALLOWED: &[&str] = &["collection", "data"];
62    if let Err(e) = validation::validate_allowed_properties(payload, SET_ALLOWED) {
63        return (400, json!({ "error": e.to_string(), "statusCode": 400 }));
64    }
65    if let Err(e) = validation::validate_request(payload, max_body_size, max_keys_per_request) {
66        return (400, json!({ "error": e.to_string(), "statusCode": 400 }));
67    }
68
69    let col = payload["collection"].as_str().unwrap_or("default");
70
71    match payload.get("data") {
72        // ── Object map format ─────────────────────────────────────────────────
73        // { "data": { "u1": { "name": "Alice" }, "u2": { "name": "Bob" } } }
74        Some(Value::Object(data_map)) => {
75            // Collect all key-value pairs into a Vec for batch insert.
76            let mut items = Vec::new();
77            for (k, v) in data_map {
78                let resolved = resolve_extends(v.clone(), db);
79                items.push((k.clone(), resolved));
80            }
81
82            match db.insert(col, items) {
83                Ok(_) => {
84                    // Check collection size for auto-eviction (Hybrid Bitcask).
85                    if let Ok(count) = db.evict_collection(col, db.hot_threshold) {
86                        if count > 0 {
87                            debug!("❄️  Auto-evicted {} documents from {} to disk", count, col);
88                        }
89                    }
90                    (200, json!({ "status": "ok", "count": data_map.len() }))
91                },
92                Err(engine::DbError::Conflict) => (409, json!({ "error": "Conflict: Document version is outdated", "statusCode": 409 })),
93                #[cfg(feature = "schema")]
94                Err(engine::DbError::SchemaValidationError(msg)) => (400, json!({ "error": msg, "statusCode": 400 })),
95                Err(e) => (500, json!({ "error": "Database write failed", "details": e.to_string(), "statusCode": 500 }))
96            }
97        },
98
99        // ── Array format ──────────────────────────────────────────────────────
100        // { "data": [ { "name": "Alice" }, { "name": "Bob" } ] }
101        // Auto-generates UUIDv7 keys for each document.
102        Some(Value::Array(data_arr)) => {
103            let mut items = Vec::new();
104            let mut generated_ids = Vec::new();
105
106            for item in data_arr {
107                // UUIDv7 is time-ordered — documents inserted together will have
108                // adjacent keys, which is good for range scans.
109                let id = Uuid::now_v7().to_string();
110                generated_ids.push(id.clone());
111                // resolve_extends() handles the `extends` block for auto-keyed
112                // documents exactly the same as for named-key documents.
113                let resolved = resolve_extends(item.clone(), db);
114                items.push((id, resolved));
115            }
116
117            match db.insert(col, items) {
118                Ok(_) => {
119                    // Check collection size for auto-eviction (Hybrid Bitcask).
120                    if let Ok(count) = db.evict_collection(col, db.hot_threshold) {
121                        if count > 0 {
122                            debug!("❄️  Auto-evicted {} documents from {} to disk", count, col);
123                        }
124                    }
125                    (200, json!({
126                        "status": "ok",
127                        "count": data_arr.len(),
128                        "ids": generated_ids
129                    }))
130                },
131                Err(engine::DbError::Conflict) => (409, json!({ "error": "Conflict: Document version is outdated", "statusCode": 409 })),
132                #[cfg(feature = "schema")]
133                Err(engine::DbError::SchemaValidationError(msg)) => (400, json!({ "error": msg, "statusCode": 400 })),
134                Err(e) => (500, json!({ "error": "Database write failed", "details": e.to_string(), "statusCode": 500 }))
135            }
136        },
137
138        // Missing or invalid data field.
139        _ => (400, json!({ "error": "Missing 'data' (object or array)", "statusCode": 400 }))
140    }
141}