Skip to main content

reddb_server/application/
entity.rs

1use std::collections::HashMap;
2
3use crate::application::ports::RuntimeEntityPort;
4use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
5use crate::presentation::entity_json::storage_value_to_json;
6use crate::storage::schema::{DataType, Value};
7use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
8use crate::storage::unified::{Metadata, MetadataValue, RefTarget, SparseVector, VectorData};
9use crate::storage::{EntityId, UnifiedEntity};
10use crate::{RedDBError, RedDBResult};
11
12#[derive(Debug, Clone)]
13pub struct CreateEntityOutput {
14    pub id: EntityId,
15    pub entity: Option<UnifiedEntity>,
16}
17
18#[derive(Debug, Clone)]
19pub struct AppliedEntityMutation {
20    pub id: EntityId,
21    pub collection: String,
22    pub entity: UnifiedEntity,
23    pub metadata: Option<crate::storage::unified::Metadata>,
24    pub modified_columns: Vec<String>,
25    pub persist_metadata: bool,
26    pub context_index_dirty: bool,
27    /// Prior physical version retained for MVCC history. Present when an
28    /// UPDATE creates a new physical entity for the same logical row.
29    pub replaced_entity: Option<UnifiedEntity>,
30    /// xmax carried by `replaced_entity` before this mutation stamped it.
31    pub replaced_entity_previous_xmax: u64,
32    /// Snapshot of the row's named fields BEFORE the mutation was
33    /// applied. Carried so the post-write secondary-index hook can
34    /// `delete(old) + insert(new)` for changed indexed columns.
35    /// Empty when the entity isn't a row or the row carried neither a
36    /// `named` map nor a `schema` Arc.
37    pub pre_mutation_fields: Vec<(String, Value)>,
38}
39
40/// Damage-vector for a row update: the minimal diff between the
41/// pre-mutation and post-mutation field state. Downstream consumers
42/// (secondary-index maintainer, CDC emitter, eventually the pager's
43/// in-place update path) work off this struct so they only touch
44/// columns that actually changed.
45///
46/// Future work (Fase 5): the pager can decide whether `changed` fits
47/// in the existing cell's slotted footprint and patch bytes in place
48/// instead of rewriting the whole row.
49#[derive(Debug, Clone, Default, PartialEq)]
50pub struct RowDamageVector {
51    /// Columns present in both `old` and `new` whose value differs.
52    /// `(column, old_value, new_value)`.
53    pub changed: Vec<(String, Value, Value)>,
54    /// Columns present in `new` but not in `old`.
55    pub added: Vec<(String, Value)>,
56    /// Columns present in `old` but not in `new`.
57    pub removed: Vec<(String, Value)>,
58}
59
60impl RowDamageVector {
61    /// True when no columns changed, were added, or were removed.
62    /// Callers can short-circuit index/CDC/in-place work when this
63    /// holds.
64    pub fn is_empty(&self) -> bool {
65        self.changed.is_empty() && self.added.is_empty() && self.removed.is_empty()
66    }
67
68    /// Names of every column the update touched. Equivalent to the
69    /// current `modified_columns` list carried on `AppliedEntityMutation`
70    /// but computed from the ground truth of old/new rather than
71    /// from what the SQL executor thinks it wrote.
72    pub fn touched_columns(&self) -> Vec<&str> {
73        let mut out: Vec<&str> =
74            Vec::with_capacity(self.changed.len() + self.added.len() + self.removed.len());
75        out.extend(self.changed.iter().map(|(c, _, _)| c.as_str()));
76        out.extend(self.added.iter().map(|(c, _)| c.as_str()));
77        out.extend(self.removed.iter().map(|(c, _)| c.as_str()));
78        out
79    }
80}
81
82/// Compute the damage-vector between an old and new row snapshot.
83/// Both inputs are field lists (as carried by `CreateRowInput` and
84/// `AppliedEntityMutation.pre_mutation_fields`); the order of fields
85/// is not significant. Columns present in both sides with identical
86/// values don't appear in any bucket.
87pub fn row_damage_vector(
88    old_fields: &[(String, Value)],
89    new_fields: &[(String, Value)],
90) -> RowDamageVector {
91    // HashMap<&str, &Value> keyed by column name for O(1) membership
92    // checks. Both inputs are expected to be small (tens of columns),
93    // so the allocation overhead is negligible vs the O(N*M) pairwise
94    // comparison we'd otherwise need.
95    let old_map: HashMap<&str, &Value> = old_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
96    let new_map: HashMap<&str, &Value> = new_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
97
98    let mut changed = Vec::new();
99    let mut added = Vec::new();
100    let mut removed = Vec::new();
101
102    for (name, new_value) in &new_map {
103        match old_map.get(name) {
104            Some(old_value) if old_value == new_value => {}
105            Some(old_value) => changed.push((
106                (*name).to_string(),
107                (*old_value).clone(),
108                (*new_value).clone(),
109            )),
110            None => added.push(((*name).to_string(), (*new_value).clone())),
111        }
112    }
113    for (name, old_value) in &old_map {
114        if !new_map.contains_key(name) {
115            removed.push(((*name).to_string(), (*old_value).clone()));
116        }
117    }
118
119    RowDamageVector {
120        changed,
121        added,
122        removed,
123    }
124}
125
126#[derive(Debug, Clone)]
127pub struct RowUpdateColumnRule {
128    pub name: String,
129    pub data_type: DataType,
130    pub data_type_name: String,
131    pub not_null: bool,
132    pub enum_variants: Vec<String>,
133}
134
135#[derive(Debug, Clone)]
136pub struct RowUpdateContractPlan {
137    pub timestamps_enabled: bool,
138    pub strict_schema: bool,
139    pub declared_rules: HashMap<String, RowUpdateColumnRule>,
140    pub unique_columns: HashMap<String, ()>,
141}
142
143#[derive(Debug, Clone)]
144pub struct CreateRowInput {
145    pub collection: String,
146    pub fields: Vec<(String, Value)>,
147    pub metadata: Vec<(String, MetadataValue)>,
148    pub node_links: Vec<NodeRef>,
149    pub vector_links: Vec<VectorRef>,
150}
151
152#[derive(Debug, Clone)]
153pub struct CreateRowsBatchInput {
154    pub collection: String,
155    pub rows: Vec<CreateRowInput>,
156    /// When true, no event subscriptions fire for this batch (SUPPRESS EVENTS).
157    pub suppress_events: bool,
158}
159
160#[derive(Debug, Clone)]
161pub struct CreateNodeEmbeddingInput {
162    pub name: String,
163    pub vector: Vec<f32>,
164    pub model: Option<String>,
165}
166
167#[derive(Debug, Clone)]
168pub struct CreateNodeTableLinkInput {
169    pub key: String,
170    pub table: TableRef,
171}
172
173#[derive(Debug, Clone)]
174pub struct CreateNodeGraphLinkInput {
175    pub target: EntityId,
176    pub edge_label: String,
177    pub weight: f32,
178}
179
180#[derive(Debug, Clone)]
181pub struct CreateNodeInput {
182    pub collection: String,
183    pub label: String,
184    pub node_type: Option<String>,
185    pub properties: Vec<(String, Value)>,
186    pub metadata: Vec<(String, MetadataValue)>,
187    pub embeddings: Vec<CreateNodeEmbeddingInput>,
188    pub table_links: Vec<CreateNodeTableLinkInput>,
189    pub node_links: Vec<CreateNodeGraphLinkInput>,
190}
191
192#[derive(Debug, Clone)]
193pub struct CreateEdgeInput {
194    pub collection: String,
195    pub label: String,
196    pub from: EntityId,
197    pub to: EntityId,
198    pub weight: Option<f32>,
199    pub properties: Vec<(String, Value)>,
200    pub metadata: Vec<(String, MetadataValue)>,
201}
202
203#[derive(Debug, Clone)]
204pub struct CreateVectorInput {
205    pub collection: String,
206    pub dense: Vec<f32>,
207    pub content: Option<String>,
208    pub metadata: Vec<(String, MetadataValue)>,
209    pub link_row: Option<TableRef>,
210    pub link_node: Option<NodeRef>,
211}
212
213#[derive(Debug, Clone)]
214pub struct CreateDocumentInput {
215    pub collection: String,
216    pub body: JsonValue,
217    pub metadata: Vec<(String, MetadataValue)>,
218    pub node_links: Vec<NodeRef>,
219    pub vector_links: Vec<VectorRef>,
220}
221
222#[derive(Debug, Clone)]
223pub struct CreateKvInput {
224    pub collection: String,
225    pub key: String,
226    pub value: Value,
227    pub metadata: Vec<(String, MetadataValue)>,
228}
229
230#[derive(Debug, Clone)]
231pub struct CreateTimeSeriesPointInput {
232    pub collection: String,
233    pub metric: String,
234    pub value: f64,
235    pub timestamp_ns: Option<u64>,
236    pub tags: Vec<(String, String)>,
237    pub metadata: Vec<(String, MetadataValue)>,
238}
239
240#[derive(Debug, Clone)]
241pub struct DeleteEntityInput {
242    pub collection: String,
243    pub id: EntityId,
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum PatchEntityOperationType {
248    Set,
249    Replace,
250    Unset,
251}
252
253#[derive(Debug, Clone)]
254pub struct PatchEntityOperation {
255    pub op: PatchEntityOperationType,
256    pub path: Vec<String>,
257    pub value: Option<JsonValue>,
258}
259
260#[derive(Debug, Clone)]
261pub struct PatchEntityInput {
262    pub collection: String,
263    pub id: EntityId,
264    pub payload: JsonValue,
265    pub operations: Vec<PatchEntityOperation>,
266}
267
268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub struct DeleteEntityOutput {
270    pub deleted: bool,
271    pub id: EntityId,
272}
273
274pub struct EntityUseCases<'a, P: ?Sized> {
275    runtime: &'a P,
276}
277
278impl<'a, P: RuntimeEntityPort + ?Sized> EntityUseCases<'a, P> {
279    pub fn new(runtime: &'a P) -> Self {
280        Self { runtime }
281    }
282
283    pub fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
284        self.runtime.create_row(input)
285    }
286
287    pub fn create_rows_batch(
288        &self,
289        input: CreateRowsBatchInput,
290    ) -> RedDBResult<Vec<CreateEntityOutput>> {
291        self.runtime.create_rows_batch(input)
292    }
293
294    pub fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
295        self.runtime.create_node(input)
296    }
297
298    pub fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
299        self.runtime.create_edge(input)
300    }
301
302    pub fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
303        self.runtime.create_vector(input)
304    }
305
306    pub fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
307        self.runtime.create_document(input)
308    }
309
310    pub fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
311        self.runtime.create_kv(input)
312    }
313
314    pub fn create_timeseries_point(
315        &self,
316        input: CreateTimeSeriesPointInput,
317    ) -> RedDBResult<CreateEntityOutput> {
318        self.runtime.create_timeseries_point(input)
319    }
320
321    pub fn get_kv(&self, collection: &str, key: &str) -> RedDBResult<Option<(Value, EntityId)>> {
322        self.runtime.get_kv(collection, key)
323    }
324
325    pub fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
326        self.runtime.delete_kv(collection, key)
327    }
328
329    pub fn patch(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
330        self.runtime.patch_entity(input)
331    }
332
333    pub fn delete(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
334        self.runtime.delete_entity(input)
335    }
336}
337
338pub(crate) fn json_to_storage_value(value: &JsonValue) -> RedDBResult<Value> {
339    match value {
340        JsonValue::Null => Ok(Value::Null),
341        JsonValue::Bool(value) => Ok(Value::Boolean(*value)),
342        JsonValue::Number(value) => {
343            if value.fract().abs() < f64::EPSILON {
344                Ok(Value::Integer(*value as i64))
345            } else {
346                Ok(Value::Float(*value))
347            }
348        }
349        JsonValue::String(value) => Ok(Value::text(value.clone())),
350        JsonValue::Array(_) | JsonValue::Object(_) => json_to_vec(value)
351            .map(Value::Json)
352            .map_err(|err| RedDBError::Query(format!("failed to serialize JSON value: {err}"))),
353    }
354}
355
356pub(crate) fn json_to_metadata_value(value: &JsonValue) -> RedDBResult<MetadataValue> {
357    match value {
358        JsonValue::Null => Ok(MetadataValue::Null),
359        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
360        JsonValue::Number(value) => {
361            if value.fract().abs() < f64::EPSILON {
362                Ok(MetadataValue::Int(*value as i64))
363            } else {
364                Ok(MetadataValue::Float(*value))
365            }
366        }
367        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
368        JsonValue::Array(values) => {
369            let mut items = Vec::with_capacity(values.len());
370            for value in values {
371                items.push(json_to_metadata_value(value)?);
372            }
373            Ok(MetadataValue::Array(items))
374        }
375        JsonValue::Object(map) => {
376            let mut object = HashMap::with_capacity(map.len());
377            for (key, value) in map {
378                object.insert(key.clone(), json_to_metadata_value(value)?);
379            }
380            Ok(MetadataValue::Object(object))
381        }
382    }
383}
384
385pub(crate) fn apply_patch_operations_to_storage_map(
386    fields: &mut HashMap<String, Value>,
387    operations: &[PatchEntityOperation],
388) -> RedDBResult<()> {
389    if operations.is_empty() {
390        return Ok(());
391    }
392
393    let mut patch_target = JsonValue::Object(
394        fields
395            .iter()
396            .map(|(key, value)| (key.clone(), storage_value_to_json(value)))
397            .collect(),
398    );
399    apply_patch_operations_to_json(&mut patch_target, operations)
400        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
401
402    let JsonValue::Object(object) = patch_target else {
403        return Err(RedDBError::Query(
404            "patch operations require object roots".to_string(),
405        ));
406    };
407
408    let mut merged = HashMap::with_capacity(object.len());
409    for (key, value) in object {
410        merged.insert(key, json_to_storage_value(&value)?);
411    }
412    *fields = merged;
413    Ok(())
414}
415
416pub(crate) fn apply_patch_operations_to_json(
417    value: &mut JsonValue,
418    operations: &[PatchEntityOperation],
419) -> Result<(), String> {
420    for operation in operations {
421        if operation.path.is_empty() {
422            return Err("patch path cannot be empty".to_string());
423        }
424
425        match operation.op {
426            PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
427                let Some(patch_value) = &operation.value else {
428                    return Err("set/replace operations require a value".to_string());
429                };
430                apply_patch_json_set(value, &operation.path, patch_value.clone())?;
431            }
432            PatchEntityOperationType::Unset => {
433                apply_patch_json_unset(value, &operation.path)?;
434            }
435        }
436    }
437    Ok(())
438}
439
440pub(crate) fn apply_patch_operations_to_vector_fields(
441    vector: &mut VectorData,
442    operations: &[PatchEntityOperation],
443) -> RedDBResult<()> {
444    if operations.is_empty() {
445        return Ok(());
446    }
447
448    let mut vector_target = JsonValue::Object({
449        let mut object = Map::new();
450        object.insert(
451            "dense".to_string(),
452            JsonValue::Array(
453                vector
454                    .dense
455                    .iter()
456                    .map(|value| JsonValue::Number(*value as f64))
457                    .collect(),
458            ),
459        );
460        object.insert(
461            "sparse".to_string(),
462            vector.sparse.as_ref().map_or(JsonValue::Null, |sparse| {
463                let mut object = Map::new();
464                object.insert(
465                    "indices".to_string(),
466                    JsonValue::Array(
467                        sparse
468                            .indices
469                            .iter()
470                            .map(|value| JsonValue::Number(*value as f64))
471                            .collect(),
472                    ),
473                );
474                object.insert(
475                    "values".to_string(),
476                    JsonValue::Array(
477                        sparse
478                            .values
479                            .iter()
480                            .map(|value| JsonValue::Number(*value as f64))
481                            .collect(),
482                    ),
483                );
484                object.insert(
485                    "dimension".to_string(),
486                    JsonValue::Number(sparse.dimension as f64),
487                );
488                JsonValue::Object(object)
489            }),
490        );
491        object.insert(
492            "content".to_string(),
493            match vector.content.as_ref() {
494                Some(value) => JsonValue::String(value.clone()),
495                None => JsonValue::Null,
496            },
497        );
498        object
499    });
500
501    let touched_dense = operations
502        .iter()
503        .any(|operation| operation.path.first().is_some_and(|key| key == "dense"));
504    let touched_sparse = operations
505        .iter()
506        .any(|operation| operation.path.first().is_some_and(|key| key == "sparse"));
507    let touched_content = operations
508        .iter()
509        .any(|operation| operation.path.first().is_some_and(|key| key == "content"));
510
511    apply_patch_operations_to_json(&mut vector_target, operations)
512        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
513
514    let JsonValue::Object(object) = vector_target else {
515        return Err(RedDBError::Query(
516            "patch operations require object roots".to_string(),
517        ));
518    };
519
520    if touched_dense {
521        let Some(value) = object.get("dense") else {
522            return Err(RedDBError::Query(
523                "field 'dense' cannot be unset".to_string(),
524            ));
525        };
526        vector.dense = parse_patch_f32_vector(value, "dense")?;
527    }
528
529    if touched_content {
530        vector.content = match object.get("content") {
531            None | Some(JsonValue::Null) => None,
532            Some(value) => Some(
533                value
534                    .as_str()
535                    .ok_or_else(|| {
536                        RedDBError::Query("field 'content' must be a string".to_string())
537                    })?
538                    .to_string(),
539            ),
540        };
541    }
542
543    if touched_sparse {
544        vector.sparse = match object.get("sparse") {
545            Some(value) => parse_sparse_vector_value(value)?,
546            None => None,
547        };
548    }
549
550    Ok(())
551}
552
553pub(crate) fn metadata_to_json(metadata: &Metadata) -> JsonValue {
554    JsonValue::Object(
555        metadata
556            .iter()
557            .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
558            .collect(),
559    )
560}
561
562pub(crate) fn metadata_from_json(payload: &JsonValue) -> RedDBResult<Metadata> {
563    let JsonValue::Object(object) = payload else {
564        return Err(RedDBError::Query(
565            "metadata patch requires an object".to_string(),
566        ));
567    };
568
569    let mut metadata = Metadata::new();
570    for (key, value) in object {
571        metadata.set(key.clone(), metadata_value_from_json(value)?);
572    }
573    Ok(metadata)
574}
575
576fn metadata_value_to_json(value: &MetadataValue) -> JsonValue {
577    match value {
578        MetadataValue::Null => JsonValue::Null,
579        MetadataValue::Bool(value) => JsonValue::Bool(*value),
580        MetadataValue::Int(value) => JsonValue::Number(*value as f64),
581        MetadataValue::Float(value) => JsonValue::Number(*value),
582        MetadataValue::String(value) => JsonValue::String(value.clone()),
583        MetadataValue::Bytes(value) => {
584            let mut object = Map::new();
585            object.insert(
586                "__redb_type".to_string(),
587                JsonValue::String("bytes".to_string()),
588            );
589            object.insert(
590                "value".to_string(),
591                JsonValue::Array(
592                    value
593                        .iter()
594                        .map(|value| JsonValue::Number(*value as f64))
595                        .collect(),
596                ),
597            );
598            JsonValue::Object(object)
599        }
600        MetadataValue::Array(values) => {
601            JsonValue::Array(values.iter().map(metadata_value_to_json).collect())
602        }
603        MetadataValue::Object(object) => JsonValue::Object(
604            object
605                .iter()
606                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
607                .collect(),
608        ),
609        MetadataValue::Timestamp(value) => JsonValue::Number(*value as f64),
610        MetadataValue::Geo { lat, lon } => {
611            let mut object = Map::new();
612            object.insert(
613                "__redb_type".to_string(),
614                JsonValue::String("geo".to_string()),
615            );
616            object.insert("lat".to_string(), JsonValue::Number(*lat));
617            object.insert("lon".to_string(), JsonValue::Number(*lon));
618            JsonValue::Object(object)
619        }
620        MetadataValue::Reference(value) => {
621            let mut object = Map::new();
622            object.insert(
623                "__redb_type".to_string(),
624                JsonValue::String("reference".to_string()),
625            );
626            let (kind, collection, id) = match value {
627                RefTarget::TableRow { table, row_id } => ("table_row", table.as_str(), *row_id),
628                RefTarget::Node {
629                    collection,
630                    node_id,
631                } => ("node", collection.as_str(), node_id.raw()),
632                RefTarget::Edge {
633                    collection,
634                    edge_id,
635                } => ("edge", collection.as_str(), edge_id.raw()),
636                RefTarget::Vector {
637                    collection,
638                    vector_id,
639                } => ("vector", collection.as_str(), vector_id.raw()),
640                RefTarget::Entity {
641                    collection,
642                    entity_id,
643                } => ("entity", collection.as_str(), entity_id.raw()),
644            };
645            object.insert("kind".to_string(), JsonValue::String(kind.to_string()));
646            object.insert(
647                "collection".to_string(),
648                JsonValue::String(collection.to_string()),
649            );
650            object.insert("id".to_string(), JsonValue::Number(id as f64));
651            JsonValue::Object(object)
652        }
653        MetadataValue::References(values) => {
654            let mut object = Map::new();
655            object.insert(
656                "__redb_type".to_string(),
657                JsonValue::String("references".to_string()),
658            );
659            object.insert(
660                "values".to_string(),
661                JsonValue::Array(
662                    values
663                        .iter()
664                        .map(|r| metadata_value_to_json(&MetadataValue::Reference(r.clone())))
665                        .collect(),
666                ),
667            );
668            JsonValue::Object(object)
669        }
670    }
671}
672
673fn metadata_value_from_json(value: &JsonValue) -> RedDBResult<MetadataValue> {
674    match value {
675        JsonValue::Null => Ok(MetadataValue::Null),
676        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
677        JsonValue::Number(value) => {
678            if value.fract().abs() < f64::EPSILON {
679                Ok(MetadataValue::Int(*value as i64))
680            } else {
681                Ok(MetadataValue::Float(*value))
682            }
683        }
684        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
685        JsonValue::Array(values) => {
686            let mut out = Vec::with_capacity(values.len());
687            for value in values {
688                out.push(metadata_value_from_json(value)?);
689            }
690            Ok(MetadataValue::Array(out))
691        }
692        JsonValue::Object(object) => {
693            if let Some(marker) = object.get("__redb_type").and_then(JsonValue::as_str) {
694                match marker {
695                    "bytes" => {
696                        let values = object
697                            .get("value")
698                            .and_then(JsonValue::as_array)
699                            .ok_or_else(|| {
700                                RedDBError::Query(
701                                    "metadata marker 'bytes' requires array value".to_string(),
702                                )
703                            })?;
704                        let mut out = Vec::with_capacity(values.len());
705                        for value in values {
706                            let value = value.as_i64().ok_or_else(|| {
707                                RedDBError::Query(
708                                    "metadata bytes must contain integer values".to_string(),
709                                )
710                            })?;
711                            if !(0..=255).contains(&value) {
712                                return Err(RedDBError::Query(
713                                    "metadata bytes must contain values between 0 and 255"
714                                        .to_string(),
715                                ));
716                            }
717                            out.push(value as u8);
718                        }
719                        return Ok(MetadataValue::Bytes(out));
720                    }
721                    "geo" => {
722                        let lat =
723                            object
724                                .get("lat")
725                                .and_then(JsonValue::as_f64)
726                                .ok_or_else(|| {
727                                    RedDBError::Query(
728                                        "metadata marker 'geo' requires numeric 'lat'".to_string(),
729                                    )
730                                })?;
731                        let lon =
732                            object
733                                .get("lon")
734                                .and_then(JsonValue::as_f64)
735                                .ok_or_else(|| {
736                                    RedDBError::Query(
737                                        "metadata marker 'geo' requires numeric 'lon'".to_string(),
738                                    )
739                                })?;
740                        return Ok(MetadataValue::Geo { lat, lon });
741                    }
742                    "reference" => {
743                        return parse_metadata_reference(object).map(MetadataValue::Reference)
744                    }
745                    "references" => {
746                        let values = object
747                            .get("values")
748                            .and_then(JsonValue::as_array)
749                            .ok_or_else(|| {
750                                RedDBError::Query(
751                                    "metadata marker 'references' requires array 'values'"
752                                        .to_string(),
753                                )
754                            })?;
755                        let mut references = Vec::with_capacity(values.len());
756                        for value in values {
757                            references.push(parse_metadata_reference_value(value)?);
758                        }
759                        return Ok(MetadataValue::References(references));
760                    }
761                    _ => {}
762                }
763            }
764
765            let mut out = HashMap::with_capacity(object.len());
766            for (key, value) in object {
767                out.insert(key.clone(), metadata_value_from_json(value)?);
768            }
769            Ok(MetadataValue::Object(out))
770        }
771    }
772}
773
774fn parse_metadata_reference(object: &Map<String, JsonValue>) -> RedDBResult<RefTarget> {
775    let kind = object
776        .get("kind")
777        .and_then(JsonValue::as_str)
778        .ok_or_else(|| RedDBError::Query("metadata reference requires 'kind'".to_string()))?;
779    let collection = object
780        .get("collection")
781        .and_then(JsonValue::as_str)
782        .ok_or_else(|| RedDBError::Query("metadata reference requires 'collection'".to_string()))?;
783    let id = object
784        .get("id")
785        .ok_or_else(|| RedDBError::Query("metadata reference requires 'id'".to_string()))?;
786    let id = parse_patch_u64_value(id, "id")?;
787
788    let target = match kind {
789        "table_row" | "table" => RefTarget::table(collection.to_string(), id),
790        "node" => RefTarget::node(collection.to_string(), EntityId::new(id)),
791        "edge" => RefTarget::Edge {
792            collection: collection.to_string(),
793            edge_id: EntityId::new(id),
794        },
795        "vector" => RefTarget::vector(collection.to_string(), EntityId::new(id)),
796        "entity" => RefTarget::Entity {
797            collection: collection.to_string(),
798            entity_id: EntityId::new(id),
799        },
800        _ => {
801            return Err(RedDBError::Query(format!(
802                "unsupported metadata reference kind '{kind}'"
803            )));
804        }
805    };
806
807    Ok(target)
808}
809
810fn parse_metadata_reference_value(value: &JsonValue) -> RedDBResult<RefTarget> {
811    let JsonValue::Object(object) = value else {
812        return Err(RedDBError::Query(
813            "metadata reference entries must be objects".to_string(),
814        ));
815    };
816    parse_metadata_reference(object)
817}
818
819fn parse_patch_u64_value(value: &JsonValue, field: &str) -> RedDBResult<u64> {
820    let Some(value) = value.as_f64() else {
821        return Err(RedDBError::Query(format!(
822            "field '{field}' must be a number"
823        )));
824    };
825    if value.is_sign_negative() {
826        return Err(RedDBError::Query(format!(
827            "field '{field}' cannot be negative"
828        )));
829    }
830    if value.fract().abs() > f64::EPSILON {
831        return Err(RedDBError::Query(format!(
832            "field '{field}' must be an integer"
833        )));
834    }
835    if value > u64::MAX as f64 {
836        return Err(RedDBError::Query(format!("field '{field}' is too large")));
837    }
838    Ok(value as u64)
839}
840
841fn parse_patch_f32_vector(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
842    let values = value
843        .as_array()
844        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
845    let mut out = Vec::with_capacity(values.len());
846    for value in values {
847        let number = value.as_f64().ok_or_else(|| {
848            RedDBError::Query(format!("field '{field}' must contain only numbers"))
849        })?;
850        out.push(number as f32);
851    }
852    if out.is_empty() {
853        return Err(RedDBError::Query(format!(
854            "field '{field}' cannot be empty"
855        )));
856    }
857    Ok(out)
858}
859
860fn parse_sparse_index_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<u32>> {
861    let values = value
862        .as_array()
863        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
864    let mut out = Vec::with_capacity(values.len());
865    for value in values {
866        let value = value.as_f64().ok_or_else(|| {
867            RedDBError::Query(format!("field '{field}' must contain only integers"))
868        })?;
869        if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
870            return Err(RedDBError::Query(format!(
871                "field '{field}' must contain only u32 values"
872            )));
873        }
874        if value > u32::MAX as f64 {
875            return Err(RedDBError::Query(format!(
876                "field '{field}' value is too large"
877            )));
878        }
879        out.push(value as u32);
880    }
881    Ok(out)
882}
883
884fn parse_sparse_value_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
885    parse_patch_f32_vector(value, field)
886}
887
888fn parse_sparse_vector_value(value: &JsonValue) -> RedDBResult<Option<SparseVector>> {
889    match value {
890        JsonValue::Null => Ok(None),
891        JsonValue::Object(object) => {
892            let indices = parse_sparse_index_array(
893                object.get("indices").ok_or_else(|| {
894                    RedDBError::Query("sparse metadata requires 'indices'".to_string())
895                })?,
896                "sparse.indices",
897            )?;
898            let values = parse_sparse_value_array(
899                object.get("values").ok_or_else(|| {
900                    RedDBError::Query("sparse metadata requires 'values'".to_string())
901                })?,
902                "sparse.values",
903            )?;
904            if indices.len() != values.len() {
905                return Err(RedDBError::Query(
906                    "sparse indices and values lengths must match".to_string(),
907                ));
908            }
909            let dimension = match object.get("dimension").and_then(JsonValue::as_f64) {
910                Some(value) => {
911                    if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
912                        return Err(RedDBError::Query(
913                            "sparse dimension must be a non-negative integer".to_string(),
914                        ));
915                    }
916                    if value > usize::MAX as f64 {
917                        return Err(RedDBError::Query(
918                            "sparse dimension is too large".to_string(),
919                        ));
920                    }
921                    value as usize
922                }
923                None => indices
924                    .iter()
925                    .max()
926                    .map_or(0, |index| (*index as usize) + 1),
927            };
928            if indices.iter().any(|index| (*index as usize) >= dimension) {
929                return Err(RedDBError::Query(
930                    "sparse indices must be smaller than dimension".to_string(),
931                ));
932            }
933            Ok(Some(SparseVector::new(indices, values, dimension)))
934        }
935        _ => Err(RedDBError::Query(
936            "field 'sparse' must be an object or null".to_string(),
937        )),
938    }
939}
940
941fn apply_patch_json_set(
942    target: &mut JsonValue,
943    path: &[String],
944    value: JsonValue,
945) -> Result<(), String> {
946    if path.is_empty() {
947        return Err("patch path cannot be empty".to_string());
948    }
949
950    let mut current = target;
951    for segment in &path[..path.len() - 1] {
952        let JsonValue::Object(object) = current else {
953            return Err("patch path target must be an object".to_string());
954        };
955        let value = object
956            .entry(segment.clone())
957            .or_insert_with(|| JsonValue::Object(Map::new()));
958        if !matches!(value, JsonValue::Object(_)) {
959            *value = JsonValue::Object(Map::new());
960        }
961        current = value;
962    }
963
964    let JsonValue::Object(object) = current else {
965        return Err("patch path target must be an object".to_string());
966    };
967    object.insert(path[path.len() - 1].clone(), value);
968    Ok(())
969}
970
971fn apply_patch_json_unset(target: &mut JsonValue, path: &[String]) -> Result<(), String> {
972    if path.is_empty() {
973        return Err("patch path cannot be empty".to_string());
974    }
975
976    if path.len() == 1 {
977        let JsonValue::Object(object) = target else {
978            return Err("patch path target must be an object".to_string());
979        };
980        object.remove(&path[0]);
981        return Ok(());
982    }
983
984    let mut current = target;
985    for segment in &path[..path.len() - 1] {
986        let Some(value) = (match current {
987            JsonValue::Object(object) => object.get_mut(segment),
988            _ => {
989                return Err("patch path target must be an object".to_string());
990            }
991        }) else {
992            return Ok(());
993        };
994
995        if !matches!(value, JsonValue::Object(_)) {
996            return Ok(());
997        }
998        current = value;
999    }
1000
1001    let JsonValue::Object(object) = current else {
1002        return Err("patch path target must be an object".to_string());
1003    };
1004    object.remove(&path[path.len() - 1]);
1005    Ok(())
1006}
1007
1008fn format_mac(bytes: &[u8; 6]) -> String {
1009    bytes
1010        .iter()
1011        .map(|byte| format!("{byte:02x}"))
1012        .collect::<Vec<_>>()
1013        .join(":")
1014}
1015
1016#[cfg(test)]
1017mod damage_vector_tests {
1018    use super::*;
1019
1020    fn s(n: &str) -> String {
1021        n.to_string()
1022    }
1023
1024    #[test]
1025    fn identical_rows_produce_empty_vector() {
1026        let old = vec![
1027            (s("name"), Value::text(s("alice"))),
1028            (s("age"), Value::Integer(30)),
1029        ];
1030        let new = old.clone();
1031        let dv = row_damage_vector(&old, &new);
1032        assert!(dv.is_empty());
1033        assert!(dv.touched_columns().is_empty());
1034    }
1035
1036    #[test]
1037    fn detects_changed_column_only() {
1038        let old = vec![
1039            (s("name"), Value::text(s("alice"))),
1040            (s("age"), Value::Integer(30)),
1041        ];
1042        let new = vec![
1043            (s("name"), Value::text(s("alice"))),
1044            (s("age"), Value::Integer(31)),
1045        ];
1046        let dv = row_damage_vector(&old, &new);
1047        assert_eq!(dv.changed.len(), 1);
1048        assert_eq!(dv.changed[0].0, "age");
1049        assert!(dv.added.is_empty());
1050        assert!(dv.removed.is_empty());
1051        assert_eq!(dv.touched_columns(), vec!["age"]);
1052    }
1053
1054    #[test]
1055    fn detects_added_and_removed_columns() {
1056        let old = vec![
1057            (s("name"), Value::text(s("alice"))),
1058            (s("nickname"), Value::text(s("al"))),
1059        ];
1060        let new = vec![
1061            (s("name"), Value::text(s("alice"))),
1062            (s("email"), Value::text(s("a@x.com"))),
1063        ];
1064        let dv = row_damage_vector(&old, &new);
1065        assert!(dv.changed.is_empty());
1066        assert_eq!(dv.added.len(), 1);
1067        assert_eq!(dv.added[0].0, "email");
1068        assert_eq!(dv.removed.len(), 1);
1069        assert_eq!(dv.removed[0].0, "nickname");
1070    }
1071
1072    #[test]
1073    fn field_order_does_not_affect_diff() {
1074        // `(name, age)` and `(age, name)` with identical values should
1075        // diff as empty — column order is a presentation concern.
1076        let old = vec![
1077            (s("name"), Value::text(s("bob"))),
1078            (s("age"), Value::Integer(42)),
1079        ];
1080        let new = vec![
1081            (s("age"), Value::Integer(42)),
1082            (s("name"), Value::text(s("bob"))),
1083        ];
1084        assert!(row_damage_vector(&old, &new).is_empty());
1085    }
1086
1087    #[test]
1088    fn mixed_changed_added_removed() {
1089        let old = vec![
1090            (s("a"), Value::Integer(1)),
1091            (s("b"), Value::Integer(2)),
1092            (s("gone"), Value::text(s("x"))),
1093        ];
1094        let new = vec![
1095            (s("a"), Value::Integer(10)),     // changed
1096            (s("b"), Value::Integer(2)),      // unchanged
1097            (s("new"), Value::Boolean(true)), // added
1098        ];
1099        let dv = row_damage_vector(&old, &new);
1100        assert_eq!(dv.changed.len(), 1);
1101        assert_eq!(dv.added.len(), 1);
1102        assert_eq!(dv.removed.len(), 1);
1103        let mut touched: Vec<&str> = dv.touched_columns();
1104        touched.sort();
1105        assert_eq!(touched, vec!["a", "gone", "new"]);
1106    }
1107}