Skip to main content

reddb_server/application/
entity.rs

1use std::collections::HashMap;
2
3use crate::application::ports::RuntimeEntityPort;
4use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
5use crate::presentation::entity_json::storage_value_to_json;
6use crate::storage::schema::{DataType, Value};
7use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
8use crate::storage::unified::{Metadata, MetadataValue, RefTarget, SparseVector, VectorData};
9use crate::storage::{EntityId, UnifiedEntity};
10use crate::{RedDBError, RedDBResult};
11
12#[derive(Debug, Clone)]
13pub struct CreateEntityOutput {
14    pub id: EntityId,
15    pub entity: Option<UnifiedEntity>,
16}
17
18#[derive(Debug, Clone)]
19pub struct AppliedEntityMutation {
20    pub id: EntityId,
21    pub collection: String,
22    pub entity: UnifiedEntity,
23    pub metadata: Option<crate::storage::unified::Metadata>,
24    pub modified_columns: Vec<String>,
25    pub persist_metadata: bool,
26    pub context_index_dirty: bool,
27    /// Prior physical version retained for MVCC history. Present when an
28    /// UPDATE creates a new physical entity for the same logical row.
29    pub replaced_entity: Option<UnifiedEntity>,
30    /// xmax carried by `replaced_entity` before this mutation stamped it.
31    pub replaced_entity_previous_xmax: u64,
32    /// Snapshot of the row's named fields BEFORE the mutation was
33    /// applied. Carried so the post-write secondary-index hook can
34    /// `delete(old) + insert(new)` for changed indexed columns.
35    /// Empty when the entity isn't a row or the row carried neither a
36    /// `named` map nor a `schema` Arc.
37    pub pre_mutation_fields: Vec<(String, Value)>,
38}
39
40/// Damage-vector for a row update: the minimal diff between the
41/// pre-mutation and post-mutation field state. Downstream consumers
42/// (secondary-index maintainer, CDC emitter, eventually the pager's
43/// in-place update path) work off this struct so they only touch
44/// columns that actually changed.
45///
46/// Future work (Fase 5): the pager can decide whether `changed` fits
47/// in the existing cell's slotted footprint and patch bytes in place
48/// instead of rewriting the whole row.
49#[derive(Debug, Clone, Default, PartialEq)]
50pub struct RowDamageVector {
51    /// Columns present in both `old` and `new` whose value differs.
52    /// `(column, old_value, new_value)`.
53    pub changed: Vec<(String, Value, Value)>,
54    /// Columns present in `new` but not in `old`.
55    pub added: Vec<(String, Value)>,
56    /// Columns present in `old` but not in `new`.
57    pub removed: Vec<(String, Value)>,
58}
59
60impl RowDamageVector {
61    /// True when no columns changed, were added, or were removed.
62    /// Callers can short-circuit index/CDC/in-place work when this
63    /// holds.
64    pub fn is_empty(&self) -> bool {
65        self.changed.is_empty() && self.added.is_empty() && self.removed.is_empty()
66    }
67
68    /// Names of every column the update touched. Equivalent to the
69    /// current `modified_columns` list carried on `AppliedEntityMutation`
70    /// but computed from the ground truth of old/new rather than
71    /// from what the SQL executor thinks it wrote.
72    pub fn touched_columns(&self) -> Vec<&str> {
73        let mut out: Vec<&str> =
74            Vec::with_capacity(self.changed.len() + self.added.len() + self.removed.len());
75        out.extend(self.changed.iter().map(|(c, _, _)| c.as_str()));
76        out.extend(self.added.iter().map(|(c, _)| c.as_str()));
77        out.extend(self.removed.iter().map(|(c, _)| c.as_str()));
78        out
79    }
80}
81
82/// Compute the damage-vector between an old and new row snapshot.
83/// Both inputs are field lists (as carried by `CreateRowInput` and
84/// `AppliedEntityMutation.pre_mutation_fields`); the order of fields
85/// is not significant. Columns present in both sides with identical
86/// values don't appear in any bucket.
87pub fn row_damage_vector(
88    old_fields: &[(String, Value)],
89    new_fields: &[(String, Value)],
90) -> RowDamageVector {
91    // HashMap<&str, &Value> keyed by column name for O(1) membership
92    // checks. Both inputs are expected to be small (tens of columns),
93    // so the allocation overhead is negligible vs the O(N*M) pairwise
94    // comparison we'd otherwise need.
95    let old_map: HashMap<&str, &Value> = old_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
96    let new_map: HashMap<&str, &Value> = new_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
97
98    let mut changed = Vec::new();
99    let mut added = Vec::new();
100    let mut removed = Vec::new();
101
102    for (name, new_value) in &new_map {
103        match old_map.get(name) {
104            Some(old_value) if old_value == new_value => {}
105            Some(old_value) => changed.push((
106                (*name).to_string(),
107                (*old_value).clone(),
108                (*new_value).clone(),
109            )),
110            None => added.push(((*name).to_string(), (*new_value).clone())),
111        }
112    }
113    for (name, old_value) in &old_map {
114        if !new_map.contains_key(name) {
115            removed.push(((*name).to_string(), (*old_value).clone()));
116        }
117    }
118
119    RowDamageVector {
120        changed,
121        added,
122        removed,
123    }
124}
125
126#[derive(Debug, Clone)]
127pub struct RowUpdateColumnRule {
128    pub name: String,
129    pub data_type: DataType,
130    pub data_type_name: String,
131    pub not_null: bool,
132    pub enum_variants: Vec<String>,
133}
134
135#[derive(Debug, Clone)]
136pub struct RowUpdateContractPlan {
137    pub timestamps_enabled: bool,
138    pub strict_schema: bool,
139    pub declared_rules: HashMap<String, RowUpdateColumnRule>,
140    pub unique_columns: HashMap<String, ()>,
141}
142
143#[derive(Debug, Clone)]
144pub struct CreateRowInput {
145    pub collection: String,
146    pub fields: Vec<(String, Value)>,
147    pub metadata: Vec<(String, MetadataValue)>,
148    pub node_links: Vec<NodeRef>,
149    pub vector_links: Vec<VectorRef>,
150}
151
152#[derive(Debug, Clone)]
153pub struct CreateRowsBatchInput {
154    pub collection: String,
155    pub rows: Vec<CreateRowInput>,
156    /// When true, no event subscriptions fire for this batch (SUPPRESS EVENTS).
157    pub suppress_events: bool,
158}
159
160#[derive(Debug, Clone)]
161pub struct CreateNodeEmbeddingInput {
162    pub name: String,
163    pub vector: Vec<f32>,
164    pub model: Option<String>,
165}
166
167#[derive(Debug, Clone)]
168pub struct CreateNodeTableLinkInput {
169    pub key: String,
170    pub table: TableRef,
171}
172
173#[derive(Debug, Clone)]
174pub struct CreateNodeGraphLinkInput {
175    pub target: EntityId,
176    pub edge_label: String,
177    pub weight: f32,
178}
179
180#[derive(Debug, Clone)]
181pub struct CreateNodeInput {
182    pub collection: String,
183    pub label: String,
184    pub node_type: Option<String>,
185    pub properties: Vec<(String, Value)>,
186    pub metadata: Vec<(String, MetadataValue)>,
187    pub embeddings: Vec<CreateNodeEmbeddingInput>,
188    pub table_links: Vec<CreateNodeTableLinkInput>,
189    pub node_links: Vec<CreateNodeGraphLinkInput>,
190}
191
192#[derive(Debug, Clone)]
193pub struct CreateEdgeInput {
194    pub collection: String,
195    pub label: String,
196    pub from: EntityId,
197    pub to: EntityId,
198    pub weight: Option<f32>,
199    pub properties: Vec<(String, Value)>,
200    pub metadata: Vec<(String, MetadataValue)>,
201}
202
203#[derive(Debug, Clone)]
204pub struct CreateVectorInput {
205    pub collection: String,
206    pub dense: Vec<f32>,
207    pub content: Option<String>,
208    pub metadata: Vec<(String, MetadataValue)>,
209    pub link_row: Option<TableRef>,
210    pub link_node: Option<NodeRef>,
211}
212
213#[derive(Debug, Clone)]
214pub struct CreateDocumentInput {
215    pub collection: String,
216    pub body: JsonValue,
217    pub metadata: Vec<(String, MetadataValue)>,
218    pub node_links: Vec<NodeRef>,
219    pub vector_links: Vec<VectorRef>,
220}
221
222#[derive(Debug, Clone)]
223pub struct CreateKvInput {
224    pub collection: String,
225    pub key: String,
226    pub value: Value,
227    pub metadata: Vec<(String, MetadataValue)>,
228}
229
230#[derive(Debug, Clone)]
231pub struct CreateTimeSeriesPointInput {
232    pub collection: String,
233    pub metric: String,
234    pub value: f64,
235    pub timestamp_ns: Option<u64>,
236    pub tags: Vec<(String, String)>,
237    pub metadata: Vec<(String, MetadataValue)>,
238}
239
240#[derive(Debug, Clone)]
241pub struct DeleteEntityInput {
242    pub collection: String,
243    pub id: EntityId,
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum PatchEntityOperationType {
248    Set,
249    Replace,
250    Unset,
251}
252
253#[derive(Debug, Clone)]
254pub struct PatchEntityOperation {
255    pub op: PatchEntityOperationType,
256    pub path: Vec<String>,
257    pub value: Option<JsonValue>,
258}
259
260#[derive(Debug, Clone)]
261pub struct PatchEntityInput {
262    pub collection: String,
263    pub id: EntityId,
264    pub payload: JsonValue,
265    pub operations: Vec<PatchEntityOperation>,
266}
267
268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub struct DeleteEntityOutput {
270    pub deleted: bool,
271    pub id: EntityId,
272}
273
274pub struct EntityUseCases<'a, P: ?Sized> {
275    runtime: &'a P,
276}
277
278impl<'a, P: RuntimeEntityPort + ?Sized> EntityUseCases<'a, P> {
279    pub fn new(runtime: &'a P) -> Self {
280        Self { runtime }
281    }
282
283    pub fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
284        self.runtime.create_row(input)
285    }
286
287    pub fn create_rows_batch(
288        &self,
289        input: CreateRowsBatchInput,
290    ) -> RedDBResult<Vec<CreateEntityOutput>> {
291        self.runtime.create_rows_batch(input)
292    }
293
294    pub fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
295        self.runtime.create_node(input)
296    }
297
298    pub fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
299        self.runtime.create_edge(input)
300    }
301
302    pub fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
303        self.runtime.create_vector(input)
304    }
305
306    pub fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
307        self.runtime.create_document(input)
308    }
309
310    pub fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
311        self.runtime.create_kv(input)
312    }
313
314    pub fn create_timeseries_point(
315        &self,
316        input: CreateTimeSeriesPointInput,
317    ) -> RedDBResult<CreateEntityOutput> {
318        self.runtime.create_timeseries_point(input)
319    }
320
321    pub fn get_kv(&self, collection: &str, key: &str) -> RedDBResult<Option<(Value, EntityId)>> {
322        self.runtime.get_kv(collection, key)
323    }
324
325    pub fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
326        self.runtime.delete_kv(collection, key)
327    }
328
329    pub fn patch(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
330        self.runtime.patch_entity(input)
331    }
332
333    pub fn delete(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
334        self.runtime.delete_entity(input)
335    }
336}
337
338pub(crate) fn json_to_storage_value(value: &JsonValue) -> RedDBResult<Value> {
339    match value {
340        JsonValue::Null => Ok(Value::Null),
341        JsonValue::Bool(value) => Ok(Value::Boolean(*value)),
342        JsonValue::Number(value) => {
343            if value.fract().abs() < f64::EPSILON {
344                Ok(Value::Integer(*value as i64))
345            } else {
346                Ok(Value::Float(*value))
347            }
348        }
349        JsonValue::String(value) => Ok(Value::text(value.clone())),
350        JsonValue::Array(_) | JsonValue::Object(_) => json_to_vec(value)
351            .map(Value::Json)
352            .map_err(|err| RedDBError::Query(format!("failed to serialize JSON value: {err}"))),
353    }
354}
355
356pub(crate) fn json_to_metadata_value(value: &JsonValue) -> RedDBResult<MetadataValue> {
357    match value {
358        JsonValue::Null => Ok(MetadataValue::Null),
359        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
360        JsonValue::Number(value) => {
361            if value.fract().abs() < f64::EPSILON {
362                Ok(MetadataValue::Int(*value as i64))
363            } else {
364                Ok(MetadataValue::Float(*value))
365            }
366        }
367        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
368        JsonValue::Array(values) => {
369            let mut items = Vec::with_capacity(values.len());
370            for value in values {
371                items.push(json_to_metadata_value(value)?);
372            }
373            Ok(MetadataValue::Array(items))
374        }
375        JsonValue::Object(map) => {
376            let mut object = HashMap::with_capacity(map.len());
377            for (key, value) in map {
378                object.insert(key.clone(), json_to_metadata_value(value)?);
379            }
380            Ok(MetadataValue::Object(object))
381        }
382    }
383}
384
385pub(crate) fn apply_patch_operations_to_storage_map(
386    fields: &mut HashMap<String, Value>,
387    operations: &[PatchEntityOperation],
388) -> RedDBResult<()> {
389    if operations.is_empty() {
390        return Ok(());
391    }
392
393    let mut patch_target = JsonValue::Object(
394        fields
395            .iter()
396            .map(|(key, value)| (key.clone(), storage_value_to_json(value)))
397            .collect(),
398    );
399    apply_patch_operations_to_json(&mut patch_target, operations)
400        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
401
402    let JsonValue::Object(object) = patch_target else {
403        return Err(RedDBError::Query(
404            "patch operations require object roots".to_string(),
405        ));
406    };
407
408    let mut merged = HashMap::with_capacity(object.len());
409    for (key, value) in object {
410        merged.insert(key, json_to_storage_value(&value)?);
411    }
412    *fields = merged;
413    Ok(())
414}
415
416/// `pub` re-export of [`apply_patch_operations_to_json`] for the HTTP layer.
417/// The KV patch handler (issue #751) parses a JSON value, applies operations
418/// against the live JSON tree, and writes back — without reaching the
419/// entity-row patch core that this module otherwise owns.
420pub fn apply_patch_operations_to_json_public(
421    value: &mut JsonValue,
422    operations: &[PatchEntityOperation],
423) -> Result<(), String> {
424    apply_patch_operations_to_json(value, operations)
425}
426
427pub(crate) fn apply_patch_operations_to_json(
428    value: &mut JsonValue,
429    operations: &[PatchEntityOperation],
430) -> Result<(), String> {
431    for operation in operations {
432        if operation.path.is_empty() {
433            return Err("patch path cannot be empty".to_string());
434        }
435
436        match operation.op {
437            PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
438                let Some(patch_value) = &operation.value else {
439                    return Err("set/replace operations require a value".to_string());
440                };
441                apply_patch_json_set(value, &operation.path, patch_value.clone())?;
442            }
443            PatchEntityOperationType::Unset => {
444                apply_patch_json_unset(value, &operation.path)?;
445            }
446        }
447    }
448    Ok(())
449}
450
451pub(crate) fn apply_patch_operations_to_vector_fields(
452    vector: &mut VectorData,
453    operations: &[PatchEntityOperation],
454) -> RedDBResult<()> {
455    if operations.is_empty() {
456        return Ok(());
457    }
458
459    let mut vector_target = JsonValue::Object({
460        let mut object = Map::new();
461        object.insert(
462            "dense".to_string(),
463            JsonValue::Array(
464                vector
465                    .dense
466                    .iter()
467                    .map(|value| JsonValue::Number(*value as f64))
468                    .collect(),
469            ),
470        );
471        object.insert(
472            "sparse".to_string(),
473            vector.sparse.as_ref().map_or(JsonValue::Null, |sparse| {
474                let mut object = Map::new();
475                object.insert(
476                    "indices".to_string(),
477                    JsonValue::Array(
478                        sparse
479                            .indices
480                            .iter()
481                            .map(|value| JsonValue::Number(*value as f64))
482                            .collect(),
483                    ),
484                );
485                object.insert(
486                    "values".to_string(),
487                    JsonValue::Array(
488                        sparse
489                            .values
490                            .iter()
491                            .map(|value| JsonValue::Number(*value as f64))
492                            .collect(),
493                    ),
494                );
495                object.insert(
496                    "dimension".to_string(),
497                    JsonValue::Number(sparse.dimension as f64),
498                );
499                JsonValue::Object(object)
500            }),
501        );
502        object.insert(
503            "content".to_string(),
504            match vector.content.as_ref() {
505                Some(value) => JsonValue::String(value.clone()),
506                None => JsonValue::Null,
507            },
508        );
509        object
510    });
511
512    let touched_dense = operations
513        .iter()
514        .any(|operation| operation.path.first().is_some_and(|key| key == "dense"));
515    let touched_sparse = operations
516        .iter()
517        .any(|operation| operation.path.first().is_some_and(|key| key == "sparse"));
518    let touched_content = operations
519        .iter()
520        .any(|operation| operation.path.first().is_some_and(|key| key == "content"));
521
522    apply_patch_operations_to_json(&mut vector_target, operations)
523        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
524
525    let JsonValue::Object(object) = vector_target else {
526        return Err(RedDBError::Query(
527            "patch operations require object roots".to_string(),
528        ));
529    };
530
531    if touched_dense {
532        let Some(value) = object.get("dense") else {
533            return Err(RedDBError::Query(
534                "field 'dense' cannot be unset".to_string(),
535            ));
536        };
537        vector.dense = parse_patch_f32_vector(value, "dense")?;
538    }
539
540    if touched_content {
541        vector.content = match object.get("content") {
542            None | Some(JsonValue::Null) => None,
543            Some(value) => Some(
544                value
545                    .as_str()
546                    .ok_or_else(|| {
547                        RedDBError::Query("field 'content' must be a string".to_string())
548                    })?
549                    .to_string(),
550            ),
551        };
552    }
553
554    if touched_sparse {
555        vector.sparse = match object.get("sparse") {
556            Some(value) => parse_sparse_vector_value(value)?,
557            None => None,
558        };
559    }
560
561    Ok(())
562}
563
564pub(crate) fn metadata_to_json(metadata: &Metadata) -> JsonValue {
565    JsonValue::Object(
566        metadata
567            .iter()
568            .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
569            .collect(),
570    )
571}
572
573pub(crate) fn metadata_from_json(payload: &JsonValue) -> RedDBResult<Metadata> {
574    let JsonValue::Object(object) = payload else {
575        return Err(RedDBError::Query(
576            "metadata patch requires an object".to_string(),
577        ));
578    };
579
580    let mut metadata = Metadata::new();
581    for (key, value) in object {
582        metadata.set(key.clone(), metadata_value_from_json(value)?);
583    }
584    Ok(metadata)
585}
586
587fn metadata_value_to_json(value: &MetadataValue) -> JsonValue {
588    match value {
589        MetadataValue::Null => JsonValue::Null,
590        MetadataValue::Bool(value) => JsonValue::Bool(*value),
591        MetadataValue::Int(value) => JsonValue::Number(*value as f64),
592        MetadataValue::Float(value) => JsonValue::Number(*value),
593        MetadataValue::String(value) => JsonValue::String(value.clone()),
594        MetadataValue::Bytes(value) => {
595            let mut object = Map::new();
596            object.insert(
597                "__redb_type".to_string(),
598                JsonValue::String("bytes".to_string()),
599            );
600            object.insert(
601                "value".to_string(),
602                JsonValue::Array(
603                    value
604                        .iter()
605                        .map(|value| JsonValue::Number(*value as f64))
606                        .collect(),
607                ),
608            );
609            JsonValue::Object(object)
610        }
611        MetadataValue::Array(values) => {
612            JsonValue::Array(values.iter().map(metadata_value_to_json).collect())
613        }
614        MetadataValue::Object(object) => JsonValue::Object(
615            object
616                .iter()
617                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
618                .collect(),
619        ),
620        MetadataValue::Timestamp(value) => JsonValue::Number(*value as f64),
621        MetadataValue::Geo { lat, lon } => {
622            let mut object = Map::new();
623            object.insert(
624                "__redb_type".to_string(),
625                JsonValue::String("geo".to_string()),
626            );
627            object.insert("lat".to_string(), JsonValue::Number(*lat));
628            object.insert("lon".to_string(), JsonValue::Number(*lon));
629            JsonValue::Object(object)
630        }
631        MetadataValue::Reference(value) => {
632            let mut object = Map::new();
633            object.insert(
634                "__redb_type".to_string(),
635                JsonValue::String("reference".to_string()),
636            );
637            let (kind, collection, id) = match value {
638                RefTarget::TableRow { table, row_id } => ("table_row", table.as_str(), *row_id),
639                RefTarget::Node {
640                    collection,
641                    node_id,
642                } => ("node", collection.as_str(), node_id.raw()),
643                RefTarget::Edge {
644                    collection,
645                    edge_id,
646                } => ("edge", collection.as_str(), edge_id.raw()),
647                RefTarget::Vector {
648                    collection,
649                    vector_id,
650                } => ("vector", collection.as_str(), vector_id.raw()),
651                RefTarget::Entity {
652                    collection,
653                    entity_id,
654                } => ("entity", collection.as_str(), entity_id.raw()),
655            };
656            object.insert("kind".to_string(), JsonValue::String(kind.to_string()));
657            object.insert(
658                "collection".to_string(),
659                JsonValue::String(collection.to_string()),
660            );
661            object.insert("id".to_string(), JsonValue::Number(id as f64));
662            JsonValue::Object(object)
663        }
664        MetadataValue::References(values) => {
665            let mut object = Map::new();
666            object.insert(
667                "__redb_type".to_string(),
668                JsonValue::String("references".to_string()),
669            );
670            object.insert(
671                "values".to_string(),
672                JsonValue::Array(
673                    values
674                        .iter()
675                        .map(|r| metadata_value_to_json(&MetadataValue::Reference(r.clone())))
676                        .collect(),
677                ),
678            );
679            JsonValue::Object(object)
680        }
681    }
682}
683
684fn metadata_value_from_json(value: &JsonValue) -> RedDBResult<MetadataValue> {
685    match value {
686        JsonValue::Null => Ok(MetadataValue::Null),
687        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
688        JsonValue::Number(value) => {
689            if value.fract().abs() < f64::EPSILON {
690                Ok(MetadataValue::Int(*value as i64))
691            } else {
692                Ok(MetadataValue::Float(*value))
693            }
694        }
695        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
696        JsonValue::Array(values) => {
697            let mut out = Vec::with_capacity(values.len());
698            for value in values {
699                out.push(metadata_value_from_json(value)?);
700            }
701            Ok(MetadataValue::Array(out))
702        }
703        JsonValue::Object(object) => {
704            if let Some(marker) = object.get("__redb_type").and_then(JsonValue::as_str) {
705                match marker {
706                    "bytes" => {
707                        let values = object
708                            .get("value")
709                            .and_then(JsonValue::as_array)
710                            .ok_or_else(|| {
711                                RedDBError::Query(
712                                    "metadata marker 'bytes' requires array value".to_string(),
713                                )
714                            })?;
715                        let mut out = Vec::with_capacity(values.len());
716                        for value in values {
717                            let value = value.as_i64().ok_or_else(|| {
718                                RedDBError::Query(
719                                    "metadata bytes must contain integer values".to_string(),
720                                )
721                            })?;
722                            if !(0..=255).contains(&value) {
723                                return Err(RedDBError::Query(
724                                    "metadata bytes must contain values between 0 and 255"
725                                        .to_string(),
726                                ));
727                            }
728                            out.push(value as u8);
729                        }
730                        return Ok(MetadataValue::Bytes(out));
731                    }
732                    "geo" => {
733                        let lat =
734                            object
735                                .get("lat")
736                                .and_then(JsonValue::as_f64)
737                                .ok_or_else(|| {
738                                    RedDBError::Query(
739                                        "metadata marker 'geo' requires numeric 'lat'".to_string(),
740                                    )
741                                })?;
742                        let lon =
743                            object
744                                .get("lon")
745                                .and_then(JsonValue::as_f64)
746                                .ok_or_else(|| {
747                                    RedDBError::Query(
748                                        "metadata marker 'geo' requires numeric 'lon'".to_string(),
749                                    )
750                                })?;
751                        return Ok(MetadataValue::Geo { lat, lon });
752                    }
753                    "reference" => {
754                        return parse_metadata_reference(object).map(MetadataValue::Reference)
755                    }
756                    "references" => {
757                        let values = object
758                            .get("values")
759                            .and_then(JsonValue::as_array)
760                            .ok_or_else(|| {
761                                RedDBError::Query(
762                                    "metadata marker 'references' requires array 'values'"
763                                        .to_string(),
764                                )
765                            })?;
766                        let mut references = Vec::with_capacity(values.len());
767                        for value in values {
768                            references.push(parse_metadata_reference_value(value)?);
769                        }
770                        return Ok(MetadataValue::References(references));
771                    }
772                    _ => {}
773                }
774            }
775
776            let mut out = HashMap::with_capacity(object.len());
777            for (key, value) in object {
778                out.insert(key.clone(), metadata_value_from_json(value)?);
779            }
780            Ok(MetadataValue::Object(out))
781        }
782    }
783}
784
785fn parse_metadata_reference(object: &Map<String, JsonValue>) -> RedDBResult<RefTarget> {
786    let kind = object
787        .get("kind")
788        .and_then(JsonValue::as_str)
789        .ok_or_else(|| RedDBError::Query("metadata reference requires 'kind'".to_string()))?;
790    let collection = object
791        .get("collection")
792        .and_then(JsonValue::as_str)
793        .ok_or_else(|| RedDBError::Query("metadata reference requires 'collection'".to_string()))?;
794    let id = object
795        .get("id")
796        .ok_or_else(|| RedDBError::Query("metadata reference requires 'id'".to_string()))?;
797    let id = parse_patch_u64_value(id, "id")?;
798
799    let target = match kind {
800        "table_row" | "table" => RefTarget::table(collection.to_string(), id),
801        "node" => RefTarget::node(collection.to_string(), EntityId::new(id)),
802        "edge" => RefTarget::Edge {
803            collection: collection.to_string(),
804            edge_id: EntityId::new(id),
805        },
806        "vector" => RefTarget::vector(collection.to_string(), EntityId::new(id)),
807        "entity" => RefTarget::Entity {
808            collection: collection.to_string(),
809            entity_id: EntityId::new(id),
810        },
811        _ => {
812            return Err(RedDBError::Query(format!(
813                "unsupported metadata reference kind '{kind}'"
814            )));
815        }
816    };
817
818    Ok(target)
819}
820
821fn parse_metadata_reference_value(value: &JsonValue) -> RedDBResult<RefTarget> {
822    let JsonValue::Object(object) = value else {
823        return Err(RedDBError::Query(
824            "metadata reference entries must be objects".to_string(),
825        ));
826    };
827    parse_metadata_reference(object)
828}
829
830fn parse_patch_u64_value(value: &JsonValue, field: &str) -> RedDBResult<u64> {
831    let Some(value) = value.as_f64() else {
832        return Err(RedDBError::Query(format!(
833            "field '{field}' must be a number"
834        )));
835    };
836    if value.is_sign_negative() {
837        return Err(RedDBError::Query(format!(
838            "field '{field}' cannot be negative"
839        )));
840    }
841    if value.fract().abs() > f64::EPSILON {
842        return Err(RedDBError::Query(format!(
843            "field '{field}' must be an integer"
844        )));
845    }
846    if value > u64::MAX as f64 {
847        return Err(RedDBError::Query(format!("field '{field}' is too large")));
848    }
849    Ok(value as u64)
850}
851
852fn parse_patch_f32_vector(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
853    let values = value
854        .as_array()
855        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
856    let mut out = Vec::with_capacity(values.len());
857    for value in values {
858        let number = value.as_f64().ok_or_else(|| {
859            RedDBError::Query(format!("field '{field}' must contain only numbers"))
860        })?;
861        out.push(number as f32);
862    }
863    if out.is_empty() {
864        return Err(RedDBError::Query(format!(
865            "field '{field}' cannot be empty"
866        )));
867    }
868    Ok(out)
869}
870
871fn parse_sparse_index_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<u32>> {
872    let values = value
873        .as_array()
874        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
875    let mut out = Vec::with_capacity(values.len());
876    for value in values {
877        let value = value.as_f64().ok_or_else(|| {
878            RedDBError::Query(format!("field '{field}' must contain only integers"))
879        })?;
880        if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
881            return Err(RedDBError::Query(format!(
882                "field '{field}' must contain only u32 values"
883            )));
884        }
885        if value > u32::MAX as f64 {
886            return Err(RedDBError::Query(format!(
887                "field '{field}' value is too large"
888            )));
889        }
890        out.push(value as u32);
891    }
892    Ok(out)
893}
894
895fn parse_sparse_value_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
896    parse_patch_f32_vector(value, field)
897}
898
899fn parse_sparse_vector_value(value: &JsonValue) -> RedDBResult<Option<SparseVector>> {
900    match value {
901        JsonValue::Null => Ok(None),
902        JsonValue::Object(object) => {
903            let indices = parse_sparse_index_array(
904                object.get("indices").ok_or_else(|| {
905                    RedDBError::Query("sparse metadata requires 'indices'".to_string())
906                })?,
907                "sparse.indices",
908            )?;
909            let values = parse_sparse_value_array(
910                object.get("values").ok_or_else(|| {
911                    RedDBError::Query("sparse metadata requires 'values'".to_string())
912                })?,
913                "sparse.values",
914            )?;
915            if indices.len() != values.len() {
916                return Err(RedDBError::Query(
917                    "sparse indices and values lengths must match".to_string(),
918                ));
919            }
920            let dimension = match object.get("dimension").and_then(JsonValue::as_f64) {
921                Some(value) => {
922                    if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
923                        return Err(RedDBError::Query(
924                            "sparse dimension must be a non-negative integer".to_string(),
925                        ));
926                    }
927                    if value > usize::MAX as f64 {
928                        return Err(RedDBError::Query(
929                            "sparse dimension is too large".to_string(),
930                        ));
931                    }
932                    value as usize
933                }
934                None => indices
935                    .iter()
936                    .max()
937                    .map_or(0, |index| (*index as usize) + 1),
938            };
939            if indices.iter().any(|index| (*index as usize) >= dimension) {
940                return Err(RedDBError::Query(
941                    "sparse indices must be smaller than dimension".to_string(),
942                ));
943            }
944            Ok(Some(SparseVector::new(indices, values, dimension)))
945        }
946        _ => Err(RedDBError::Query(
947            "field 'sparse' must be an object or null".to_string(),
948        )),
949    }
950}
951
952fn apply_patch_json_set(
953    target: &mut JsonValue,
954    path: &[String],
955    value: JsonValue,
956) -> Result<(), String> {
957    if path.is_empty() {
958        return Err("patch path cannot be empty".to_string());
959    }
960
961    let mut current = target;
962    for segment in &path[..path.len() - 1] {
963        let JsonValue::Object(object) = current else {
964            return Err("patch path target must be an object".to_string());
965        };
966        let value = object
967            .entry(segment.clone())
968            .or_insert_with(|| JsonValue::Object(Map::new()));
969        if !matches!(value, JsonValue::Object(_)) {
970            *value = JsonValue::Object(Map::new());
971        }
972        current = value;
973    }
974
975    let JsonValue::Object(object) = current else {
976        return Err("patch path target must be an object".to_string());
977    };
978    object.insert(path[path.len() - 1].clone(), value);
979    Ok(())
980}
981
982fn apply_patch_json_unset(target: &mut JsonValue, path: &[String]) -> Result<(), String> {
983    if path.is_empty() {
984        return Err("patch path cannot be empty".to_string());
985    }
986
987    if path.len() == 1 {
988        let JsonValue::Object(object) = target else {
989            return Err("patch path target must be an object".to_string());
990        };
991        object.remove(&path[0]);
992        return Ok(());
993    }
994
995    let mut current = target;
996    for segment in &path[..path.len() - 1] {
997        let Some(value) = (match current {
998            JsonValue::Object(object) => object.get_mut(segment),
999            _ => {
1000                return Err("patch path target must be an object".to_string());
1001            }
1002        }) else {
1003            return Ok(());
1004        };
1005
1006        if !matches!(value, JsonValue::Object(_)) {
1007            return Ok(());
1008        }
1009        current = value;
1010    }
1011
1012    let JsonValue::Object(object) = current else {
1013        return Err("patch path target must be an object".to_string());
1014    };
1015    object.remove(&path[path.len() - 1]);
1016    Ok(())
1017}
1018
1019fn format_mac(bytes: &[u8; 6]) -> String {
1020    bytes
1021        .iter()
1022        .map(|byte| format!("{byte:02x}"))
1023        .collect::<Vec<_>>()
1024        .join(":")
1025}
1026
1027#[cfg(test)]
1028mod damage_vector_tests {
1029    use super::*;
1030
1031    fn s(n: &str) -> String {
1032        n.to_string()
1033    }
1034
1035    #[test]
1036    fn identical_rows_produce_empty_vector() {
1037        let old = vec![
1038            (s("name"), Value::text(s("alice"))),
1039            (s("age"), Value::Integer(30)),
1040        ];
1041        let new = old.clone();
1042        let dv = row_damage_vector(&old, &new);
1043        assert!(dv.is_empty());
1044        assert!(dv.touched_columns().is_empty());
1045    }
1046
1047    #[test]
1048    fn detects_changed_column_only() {
1049        let old = vec![
1050            (s("name"), Value::text(s("alice"))),
1051            (s("age"), Value::Integer(30)),
1052        ];
1053        let new = vec![
1054            (s("name"), Value::text(s("alice"))),
1055            (s("age"), Value::Integer(31)),
1056        ];
1057        let dv = row_damage_vector(&old, &new);
1058        assert_eq!(dv.changed.len(), 1);
1059        assert_eq!(dv.changed[0].0, "age");
1060        assert!(dv.added.is_empty());
1061        assert!(dv.removed.is_empty());
1062        assert_eq!(dv.touched_columns(), vec!["age"]);
1063    }
1064
1065    #[test]
1066    fn detects_added_and_removed_columns() {
1067        let old = vec![
1068            (s("name"), Value::text(s("alice"))),
1069            (s("nickname"), Value::text(s("al"))),
1070        ];
1071        let new = vec![
1072            (s("name"), Value::text(s("alice"))),
1073            (s("email"), Value::text(s("a@x.com"))),
1074        ];
1075        let dv = row_damage_vector(&old, &new);
1076        assert!(dv.changed.is_empty());
1077        assert_eq!(dv.added.len(), 1);
1078        assert_eq!(dv.added[0].0, "email");
1079        assert_eq!(dv.removed.len(), 1);
1080        assert_eq!(dv.removed[0].0, "nickname");
1081    }
1082
1083    #[test]
1084    fn field_order_does_not_affect_diff() {
1085        // `(name, age)` and `(age, name)` with identical values should
1086        // diff as empty — column order is a presentation concern.
1087        let old = vec![
1088            (s("name"), Value::text(s("bob"))),
1089            (s("age"), Value::Integer(42)),
1090        ];
1091        let new = vec![
1092            (s("age"), Value::Integer(42)),
1093            (s("name"), Value::text(s("bob"))),
1094        ];
1095        assert!(row_damage_vector(&old, &new).is_empty());
1096    }
1097
1098    #[test]
1099    fn mixed_changed_added_removed() {
1100        let old = vec![
1101            (s("a"), Value::Integer(1)),
1102            (s("b"), Value::Integer(2)),
1103            (s("gone"), Value::text(s("x"))),
1104        ];
1105        let new = vec![
1106            (s("a"), Value::Integer(10)),     // changed
1107            (s("b"), Value::Integer(2)),      // unchanged
1108            (s("new"), Value::Boolean(true)), // added
1109        ];
1110        let dv = row_damage_vector(&old, &new);
1111        assert_eq!(dv.changed.len(), 1);
1112        assert_eq!(dv.added.len(), 1);
1113        assert_eq!(dv.removed.len(), 1);
1114        let mut touched: Vec<&str> = dv.touched_columns();
1115        touched.sort();
1116        assert_eq!(touched, vec!["a", "gone", "new"]);
1117    }
1118}