Skip to main content

reddb_server/application/
entity.rs

1use std::collections::HashMap;
2
3use crate::application::ports::RuntimeEntityPort;
4use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
5use crate::presentation::entity_json::storage_value_to_json;
6use crate::storage::schema::{DataType, Value};
7use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
8use crate::storage::unified::{Metadata, MetadataValue, RefTarget, SparseVector, VectorData};
9use crate::storage::{EntityId, UnifiedEntity};
10use crate::{RedDBError, RedDBResult};
11
12#[derive(Debug, Clone)]
13pub struct CreateEntityOutput {
14    pub id: EntityId,
15    pub entity: Option<UnifiedEntity>,
16}
17
18#[derive(Debug, Clone)]
19pub struct AppliedEntityMutation {
20    pub id: EntityId,
21    pub collection: String,
22    pub entity: UnifiedEntity,
23    pub metadata: Option<crate::storage::unified::Metadata>,
24    pub modified_columns: Vec<String>,
25    pub persist_metadata: bool,
26    pub context_index_dirty: bool,
27    /// Snapshot of the row's named fields BEFORE the mutation was
28    /// applied. Carried so the post-write secondary-index hook can
29    /// `delete(old) + insert(new)` for changed indexed columns.
30    /// Empty when the entity isn't a row or the row carried neither a
31    /// `named` map nor a `schema` Arc.
32    pub pre_mutation_fields: Vec<(String, Value)>,
33}
34
35/// Damage-vector for a row update: the minimal diff between the
36/// pre-mutation and post-mutation field state. Downstream consumers
37/// (secondary-index maintainer, CDC emitter, eventually the pager's
38/// in-place update path) work off this struct so they only touch
39/// columns that actually changed.
40///
41/// Future work (Fase 5): the pager can decide whether `changed` fits
42/// in the existing cell's slotted footprint and patch bytes in place
43/// instead of rewriting the whole row.
44#[derive(Debug, Clone, Default, PartialEq)]
45pub struct RowDamageVector {
46    /// Columns present in both `old` and `new` whose value differs.
47    /// `(column, old_value, new_value)`.
48    pub changed: Vec<(String, Value, Value)>,
49    /// Columns present in `new` but not in `old`.
50    pub added: Vec<(String, Value)>,
51    /// Columns present in `old` but not in `new`.
52    pub removed: Vec<(String, Value)>,
53}
54
55impl RowDamageVector {
56    /// True when no columns changed, were added, or were removed.
57    /// Callers can short-circuit index/CDC/in-place work when this
58    /// holds.
59    pub fn is_empty(&self) -> bool {
60        self.changed.is_empty() && self.added.is_empty() && self.removed.is_empty()
61    }
62
63    /// Names of every column the update touched. Equivalent to the
64    /// current `modified_columns` list carried on `AppliedEntityMutation`
65    /// but computed from the ground truth of old/new rather than
66    /// from what the SQL executor thinks it wrote.
67    pub fn touched_columns(&self) -> Vec<&str> {
68        let mut out: Vec<&str> =
69            Vec::with_capacity(self.changed.len() + self.added.len() + self.removed.len());
70        out.extend(self.changed.iter().map(|(c, _, _)| c.as_str()));
71        out.extend(self.added.iter().map(|(c, _)| c.as_str()));
72        out.extend(self.removed.iter().map(|(c, _)| c.as_str()));
73        out
74    }
75}
76
77/// Compute the damage-vector between an old and new row snapshot.
78/// Both inputs are field lists (as carried by `CreateRowInput` and
79/// `AppliedEntityMutation.pre_mutation_fields`); the order of fields
80/// is not significant. Columns present in both sides with identical
81/// values don't appear in any bucket.
82pub fn row_damage_vector(
83    old_fields: &[(String, Value)],
84    new_fields: &[(String, Value)],
85) -> RowDamageVector {
86    // HashMap<&str, &Value> keyed by column name for O(1) membership
87    // checks. Both inputs are expected to be small (tens of columns),
88    // so the allocation overhead is negligible vs the O(N*M) pairwise
89    // comparison we'd otherwise need.
90    let old_map: HashMap<&str, &Value> = old_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
91    let new_map: HashMap<&str, &Value> = new_fields.iter().map(|(k, v)| (k.as_str(), v)).collect();
92
93    let mut changed = Vec::new();
94    let mut added = Vec::new();
95    let mut removed = Vec::new();
96
97    for (name, new_value) in &new_map {
98        match old_map.get(name) {
99            Some(old_value) if old_value == new_value => {}
100            Some(old_value) => changed.push((
101                (*name).to_string(),
102                (*old_value).clone(),
103                (*new_value).clone(),
104            )),
105            None => added.push(((*name).to_string(), (*new_value).clone())),
106        }
107    }
108    for (name, old_value) in &old_map {
109        if !new_map.contains_key(name) {
110            removed.push(((*name).to_string(), (*old_value).clone()));
111        }
112    }
113
114    RowDamageVector {
115        changed,
116        added,
117        removed,
118    }
119}
120
121#[derive(Debug, Clone)]
122pub struct RowUpdateColumnRule {
123    pub name: String,
124    pub data_type: DataType,
125    pub data_type_name: String,
126    pub not_null: bool,
127    pub enum_variants: Vec<String>,
128}
129
130#[derive(Debug, Clone)]
131pub struct RowUpdateContractPlan {
132    pub timestamps_enabled: bool,
133    pub strict_schema: bool,
134    pub declared_rules: HashMap<String, RowUpdateColumnRule>,
135    pub unique_columns: HashMap<String, ()>,
136}
137
138#[derive(Debug, Clone)]
139pub struct CreateRowInput {
140    pub collection: String,
141    pub fields: Vec<(String, Value)>,
142    pub metadata: Vec<(String, MetadataValue)>,
143    pub node_links: Vec<NodeRef>,
144    pub vector_links: Vec<VectorRef>,
145}
146
147#[derive(Debug, Clone)]
148pub struct CreateRowsBatchInput {
149    pub collection: String,
150    pub rows: Vec<CreateRowInput>,
151    /// When true, no event subscriptions fire for this batch (SUPPRESS EVENTS).
152    pub suppress_events: bool,
153}
154
155#[derive(Debug, Clone)]
156pub struct CreateNodeEmbeddingInput {
157    pub name: String,
158    pub vector: Vec<f32>,
159    pub model: Option<String>,
160}
161
162#[derive(Debug, Clone)]
163pub struct CreateNodeTableLinkInput {
164    pub key: String,
165    pub table: TableRef,
166}
167
168#[derive(Debug, Clone)]
169pub struct CreateNodeGraphLinkInput {
170    pub target: EntityId,
171    pub edge_label: String,
172    pub weight: f32,
173}
174
175#[derive(Debug, Clone)]
176pub struct CreateNodeInput {
177    pub collection: String,
178    pub label: String,
179    pub node_type: Option<String>,
180    pub properties: Vec<(String, Value)>,
181    pub metadata: Vec<(String, MetadataValue)>,
182    pub embeddings: Vec<CreateNodeEmbeddingInput>,
183    pub table_links: Vec<CreateNodeTableLinkInput>,
184    pub node_links: Vec<CreateNodeGraphLinkInput>,
185}
186
187#[derive(Debug, Clone)]
188pub struct CreateEdgeInput {
189    pub collection: String,
190    pub label: String,
191    pub from: EntityId,
192    pub to: EntityId,
193    pub weight: Option<f32>,
194    pub properties: Vec<(String, Value)>,
195    pub metadata: Vec<(String, MetadataValue)>,
196}
197
198#[derive(Debug, Clone)]
199pub struct CreateVectorInput {
200    pub collection: String,
201    pub dense: Vec<f32>,
202    pub content: Option<String>,
203    pub metadata: Vec<(String, MetadataValue)>,
204    pub link_row: Option<TableRef>,
205    pub link_node: Option<NodeRef>,
206}
207
208#[derive(Debug, Clone)]
209pub struct CreateDocumentInput {
210    pub collection: String,
211    pub body: JsonValue,
212    pub metadata: Vec<(String, MetadataValue)>,
213    pub node_links: Vec<NodeRef>,
214    pub vector_links: Vec<VectorRef>,
215}
216
217#[derive(Debug, Clone)]
218pub struct CreateKvInput {
219    pub collection: String,
220    pub key: String,
221    pub value: Value,
222    pub metadata: Vec<(String, MetadataValue)>,
223}
224
225#[derive(Debug, Clone)]
226pub struct CreateTimeSeriesPointInput {
227    pub collection: String,
228    pub metric: String,
229    pub value: f64,
230    pub timestamp_ns: Option<u64>,
231    pub tags: Vec<(String, String)>,
232    pub metadata: Vec<(String, MetadataValue)>,
233}
234
235#[derive(Debug, Clone)]
236pub struct DeleteEntityInput {
237    pub collection: String,
238    pub id: EntityId,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub enum PatchEntityOperationType {
243    Set,
244    Replace,
245    Unset,
246}
247
248#[derive(Debug, Clone)]
249pub struct PatchEntityOperation {
250    pub op: PatchEntityOperationType,
251    pub path: Vec<String>,
252    pub value: Option<JsonValue>,
253}
254
255#[derive(Debug, Clone)]
256pub struct PatchEntityInput {
257    pub collection: String,
258    pub id: EntityId,
259    pub payload: JsonValue,
260    pub operations: Vec<PatchEntityOperation>,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct DeleteEntityOutput {
265    pub deleted: bool,
266    pub id: EntityId,
267}
268
269pub struct EntityUseCases<'a, P: ?Sized> {
270    runtime: &'a P,
271}
272
273impl<'a, P: RuntimeEntityPort + ?Sized> EntityUseCases<'a, P> {
274    pub fn new(runtime: &'a P) -> Self {
275        Self { runtime }
276    }
277
278    pub fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
279        self.runtime.create_row(input)
280    }
281
282    pub fn create_rows_batch(
283        &self,
284        input: CreateRowsBatchInput,
285    ) -> RedDBResult<Vec<CreateEntityOutput>> {
286        self.runtime.create_rows_batch(input)
287    }
288
289    pub fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
290        self.runtime.create_node(input)
291    }
292
293    pub fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
294        self.runtime.create_edge(input)
295    }
296
297    pub fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
298        self.runtime.create_vector(input)
299    }
300
301    pub fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
302        self.runtime.create_document(input)
303    }
304
305    pub fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
306        self.runtime.create_kv(input)
307    }
308
309    pub fn create_timeseries_point(
310        &self,
311        input: CreateTimeSeriesPointInput,
312    ) -> RedDBResult<CreateEntityOutput> {
313        self.runtime.create_timeseries_point(input)
314    }
315
316    pub fn get_kv(&self, collection: &str, key: &str) -> RedDBResult<Option<(Value, EntityId)>> {
317        self.runtime.get_kv(collection, key)
318    }
319
320    pub fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
321        self.runtime.delete_kv(collection, key)
322    }
323
324    pub fn patch(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
325        self.runtime.patch_entity(input)
326    }
327
328    pub fn delete(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
329        self.runtime.delete_entity(input)
330    }
331}
332
333pub(crate) fn json_to_storage_value(value: &JsonValue) -> RedDBResult<Value> {
334    match value {
335        JsonValue::Null => Ok(Value::Null),
336        JsonValue::Bool(value) => Ok(Value::Boolean(*value)),
337        JsonValue::Number(value) => {
338            if value.fract().abs() < f64::EPSILON {
339                Ok(Value::Integer(*value as i64))
340            } else {
341                Ok(Value::Float(*value))
342            }
343        }
344        JsonValue::String(value) => Ok(Value::text(value.clone())),
345        JsonValue::Array(_) | JsonValue::Object(_) => json_to_vec(value)
346            .map(Value::Json)
347            .map_err(|err| RedDBError::Query(format!("failed to serialize JSON value: {err}"))),
348    }
349}
350
351pub(crate) fn json_to_metadata_value(value: &JsonValue) -> RedDBResult<MetadataValue> {
352    match value {
353        JsonValue::Null => Ok(MetadataValue::Null),
354        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
355        JsonValue::Number(value) => {
356            if value.fract().abs() < f64::EPSILON {
357                Ok(MetadataValue::Int(*value as i64))
358            } else {
359                Ok(MetadataValue::Float(*value))
360            }
361        }
362        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
363        JsonValue::Array(values) => {
364            let mut items = Vec::with_capacity(values.len());
365            for value in values {
366                items.push(json_to_metadata_value(value)?);
367            }
368            Ok(MetadataValue::Array(items))
369        }
370        JsonValue::Object(map) => {
371            let mut object = HashMap::with_capacity(map.len());
372            for (key, value) in map {
373                object.insert(key.clone(), json_to_metadata_value(value)?);
374            }
375            Ok(MetadataValue::Object(object))
376        }
377    }
378}
379
380pub(crate) fn apply_patch_operations_to_storage_map(
381    fields: &mut HashMap<String, Value>,
382    operations: &[PatchEntityOperation],
383) -> RedDBResult<()> {
384    if operations.is_empty() {
385        return Ok(());
386    }
387
388    let mut patch_target = JsonValue::Object(
389        fields
390            .iter()
391            .map(|(key, value)| (key.clone(), storage_value_to_json(value)))
392            .collect(),
393    );
394    apply_patch_operations_to_json(&mut patch_target, operations)
395        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
396
397    let JsonValue::Object(object) = patch_target else {
398        return Err(RedDBError::Query(
399            "patch operations require object roots".to_string(),
400        ));
401    };
402
403    let mut merged = HashMap::with_capacity(object.len());
404    for (key, value) in object {
405        merged.insert(key, json_to_storage_value(&value)?);
406    }
407    *fields = merged;
408    Ok(())
409}
410
411pub(crate) fn apply_patch_operations_to_json(
412    value: &mut JsonValue,
413    operations: &[PatchEntityOperation],
414) -> Result<(), String> {
415    for operation in operations {
416        if operation.path.is_empty() {
417            return Err("patch path cannot be empty".to_string());
418        }
419
420        match operation.op {
421            PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
422                let Some(patch_value) = &operation.value else {
423                    return Err("set/replace operations require a value".to_string());
424                };
425                apply_patch_json_set(value, &operation.path, patch_value.clone())?;
426            }
427            PatchEntityOperationType::Unset => {
428                apply_patch_json_unset(value, &operation.path)?;
429            }
430        }
431    }
432    Ok(())
433}
434
435pub(crate) fn apply_patch_operations_to_vector_fields(
436    vector: &mut VectorData,
437    operations: &[PatchEntityOperation],
438) -> RedDBResult<()> {
439    if operations.is_empty() {
440        return Ok(());
441    }
442
443    let mut vector_target = JsonValue::Object({
444        let mut object = Map::new();
445        object.insert(
446            "dense".to_string(),
447            JsonValue::Array(
448                vector
449                    .dense
450                    .iter()
451                    .map(|value| JsonValue::Number(*value as f64))
452                    .collect(),
453            ),
454        );
455        object.insert(
456            "sparse".to_string(),
457            vector.sparse.as_ref().map_or(JsonValue::Null, |sparse| {
458                let mut object = Map::new();
459                object.insert(
460                    "indices".to_string(),
461                    JsonValue::Array(
462                        sparse
463                            .indices
464                            .iter()
465                            .map(|value| JsonValue::Number(*value as f64))
466                            .collect(),
467                    ),
468                );
469                object.insert(
470                    "values".to_string(),
471                    JsonValue::Array(
472                        sparse
473                            .values
474                            .iter()
475                            .map(|value| JsonValue::Number(*value as f64))
476                            .collect(),
477                    ),
478                );
479                object.insert(
480                    "dimension".to_string(),
481                    JsonValue::Number(sparse.dimension as f64),
482                );
483                JsonValue::Object(object)
484            }),
485        );
486        object.insert(
487            "content".to_string(),
488            match vector.content.as_ref() {
489                Some(value) => JsonValue::String(value.clone()),
490                None => JsonValue::Null,
491            },
492        );
493        object
494    });
495
496    let touched_dense = operations
497        .iter()
498        .any(|operation| operation.path.first().is_some_and(|key| key == "dense"));
499    let touched_sparse = operations
500        .iter()
501        .any(|operation| operation.path.first().is_some_and(|key| key == "sparse"));
502    let touched_content = operations
503        .iter()
504        .any(|operation| operation.path.first().is_some_and(|key| key == "content"));
505
506    apply_patch_operations_to_json(&mut vector_target, operations)
507        .map_err(|error| RedDBError::Query(format!("patch fields failed: {error}")))?;
508
509    let JsonValue::Object(object) = vector_target else {
510        return Err(RedDBError::Query(
511            "patch operations require object roots".to_string(),
512        ));
513    };
514
515    if touched_dense {
516        let Some(value) = object.get("dense") else {
517            return Err(RedDBError::Query(
518                "field 'dense' cannot be unset".to_string(),
519            ));
520        };
521        vector.dense = parse_patch_f32_vector(value, "dense")?;
522    }
523
524    if touched_content {
525        vector.content = match object.get("content") {
526            None | Some(JsonValue::Null) => None,
527            Some(value) => Some(
528                value
529                    .as_str()
530                    .ok_or_else(|| {
531                        RedDBError::Query("field 'content' must be a string".to_string())
532                    })?
533                    .to_string(),
534            ),
535        };
536    }
537
538    if touched_sparse {
539        vector.sparse = match object.get("sparse") {
540            Some(value) => parse_sparse_vector_value(value)?,
541            None => None,
542        };
543    }
544
545    Ok(())
546}
547
548pub(crate) fn metadata_to_json(metadata: &Metadata) -> JsonValue {
549    JsonValue::Object(
550        metadata
551            .iter()
552            .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
553            .collect(),
554    )
555}
556
557pub(crate) fn metadata_from_json(payload: &JsonValue) -> RedDBResult<Metadata> {
558    let JsonValue::Object(object) = payload else {
559        return Err(RedDBError::Query(
560            "metadata patch requires an object".to_string(),
561        ));
562    };
563
564    let mut metadata = Metadata::new();
565    for (key, value) in object {
566        metadata.set(key.clone(), metadata_value_from_json(value)?);
567    }
568    Ok(metadata)
569}
570
571fn metadata_value_to_json(value: &MetadataValue) -> JsonValue {
572    match value {
573        MetadataValue::Null => JsonValue::Null,
574        MetadataValue::Bool(value) => JsonValue::Bool(*value),
575        MetadataValue::Int(value) => JsonValue::Number(*value as f64),
576        MetadataValue::Float(value) => JsonValue::Number(*value),
577        MetadataValue::String(value) => JsonValue::String(value.clone()),
578        MetadataValue::Bytes(value) => {
579            let mut object = Map::new();
580            object.insert(
581                "__redb_type".to_string(),
582                JsonValue::String("bytes".to_string()),
583            );
584            object.insert(
585                "value".to_string(),
586                JsonValue::Array(
587                    value
588                        .iter()
589                        .map(|value| JsonValue::Number(*value as f64))
590                        .collect(),
591                ),
592            );
593            JsonValue::Object(object)
594        }
595        MetadataValue::Array(values) => {
596            JsonValue::Array(values.iter().map(metadata_value_to_json).collect())
597        }
598        MetadataValue::Object(object) => JsonValue::Object(
599            object
600                .iter()
601                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
602                .collect(),
603        ),
604        MetadataValue::Timestamp(value) => JsonValue::Number(*value as f64),
605        MetadataValue::Geo { lat, lon } => {
606            let mut object = Map::new();
607            object.insert(
608                "__redb_type".to_string(),
609                JsonValue::String("geo".to_string()),
610            );
611            object.insert("lat".to_string(), JsonValue::Number(*lat));
612            object.insert("lon".to_string(), JsonValue::Number(*lon));
613            JsonValue::Object(object)
614        }
615        MetadataValue::Reference(value) => {
616            let mut object = Map::new();
617            object.insert(
618                "__redb_type".to_string(),
619                JsonValue::String("reference".to_string()),
620            );
621            let (kind, collection, id) = match value {
622                RefTarget::TableRow { table, row_id } => ("table_row", table.as_str(), *row_id),
623                RefTarget::Node {
624                    collection,
625                    node_id,
626                } => ("node", collection.as_str(), node_id.raw()),
627                RefTarget::Edge {
628                    collection,
629                    edge_id,
630                } => ("edge", collection.as_str(), edge_id.raw()),
631                RefTarget::Vector {
632                    collection,
633                    vector_id,
634                } => ("vector", collection.as_str(), vector_id.raw()),
635                RefTarget::Entity {
636                    collection,
637                    entity_id,
638                } => ("entity", collection.as_str(), entity_id.raw()),
639            };
640            object.insert("kind".to_string(), JsonValue::String(kind.to_string()));
641            object.insert(
642                "collection".to_string(),
643                JsonValue::String(collection.to_string()),
644            );
645            object.insert("id".to_string(), JsonValue::Number(id as f64));
646            JsonValue::Object(object)
647        }
648        MetadataValue::References(values) => {
649            let mut object = Map::new();
650            object.insert(
651                "__redb_type".to_string(),
652                JsonValue::String("references".to_string()),
653            );
654            object.insert(
655                "values".to_string(),
656                JsonValue::Array(
657                    values
658                        .iter()
659                        .map(|r| metadata_value_to_json(&MetadataValue::Reference(r.clone())))
660                        .collect(),
661                ),
662            );
663            JsonValue::Object(object)
664        }
665    }
666}
667
668fn metadata_value_from_json(value: &JsonValue) -> RedDBResult<MetadataValue> {
669    match value {
670        JsonValue::Null => Ok(MetadataValue::Null),
671        JsonValue::Bool(value) => Ok(MetadataValue::Bool(*value)),
672        JsonValue::Number(value) => {
673            if value.fract().abs() < f64::EPSILON {
674                Ok(MetadataValue::Int(*value as i64))
675            } else {
676                Ok(MetadataValue::Float(*value))
677            }
678        }
679        JsonValue::String(value) => Ok(MetadataValue::String(value.clone())),
680        JsonValue::Array(values) => {
681            let mut out = Vec::with_capacity(values.len());
682            for value in values {
683                out.push(metadata_value_from_json(value)?);
684            }
685            Ok(MetadataValue::Array(out))
686        }
687        JsonValue::Object(object) => {
688            if let Some(marker) = object.get("__redb_type").and_then(JsonValue::as_str) {
689                match marker {
690                    "bytes" => {
691                        let values = object
692                            .get("value")
693                            .and_then(JsonValue::as_array)
694                            .ok_or_else(|| {
695                                RedDBError::Query(
696                                    "metadata marker 'bytes' requires array value".to_string(),
697                                )
698                            })?;
699                        let mut out = Vec::with_capacity(values.len());
700                        for value in values {
701                            let value = value.as_i64().ok_or_else(|| {
702                                RedDBError::Query(
703                                    "metadata bytes must contain integer values".to_string(),
704                                )
705                            })?;
706                            if !(0..=255).contains(&value) {
707                                return Err(RedDBError::Query(
708                                    "metadata bytes must contain values between 0 and 255"
709                                        .to_string(),
710                                ));
711                            }
712                            out.push(value as u8);
713                        }
714                        return Ok(MetadataValue::Bytes(out));
715                    }
716                    "geo" => {
717                        let lat =
718                            object
719                                .get("lat")
720                                .and_then(JsonValue::as_f64)
721                                .ok_or_else(|| {
722                                    RedDBError::Query(
723                                        "metadata marker 'geo' requires numeric 'lat'".to_string(),
724                                    )
725                                })?;
726                        let lon =
727                            object
728                                .get("lon")
729                                .and_then(JsonValue::as_f64)
730                                .ok_or_else(|| {
731                                    RedDBError::Query(
732                                        "metadata marker 'geo' requires numeric 'lon'".to_string(),
733                                    )
734                                })?;
735                        return Ok(MetadataValue::Geo { lat, lon });
736                    }
737                    "reference" => {
738                        return parse_metadata_reference(object).map(MetadataValue::Reference)
739                    }
740                    "references" => {
741                        let values = object
742                            .get("values")
743                            .and_then(JsonValue::as_array)
744                            .ok_or_else(|| {
745                                RedDBError::Query(
746                                    "metadata marker 'references' requires array 'values'"
747                                        .to_string(),
748                                )
749                            })?;
750                        let mut references = Vec::with_capacity(values.len());
751                        for value in values {
752                            references.push(parse_metadata_reference_value(value)?);
753                        }
754                        return Ok(MetadataValue::References(references));
755                    }
756                    _ => {}
757                }
758            }
759
760            let mut out = HashMap::with_capacity(object.len());
761            for (key, value) in object {
762                out.insert(key.clone(), metadata_value_from_json(value)?);
763            }
764            Ok(MetadataValue::Object(out))
765        }
766    }
767}
768
769fn parse_metadata_reference(object: &Map<String, JsonValue>) -> RedDBResult<RefTarget> {
770    let kind = object
771        .get("kind")
772        .and_then(JsonValue::as_str)
773        .ok_or_else(|| RedDBError::Query("metadata reference requires 'kind'".to_string()))?;
774    let collection = object
775        .get("collection")
776        .and_then(JsonValue::as_str)
777        .ok_or_else(|| RedDBError::Query("metadata reference requires 'collection'".to_string()))?;
778    let id = object
779        .get("id")
780        .ok_or_else(|| RedDBError::Query("metadata reference requires 'id'".to_string()))?;
781    let id = parse_patch_u64_value(id, "id")?;
782
783    let target = match kind {
784        "table_row" | "table" => RefTarget::table(collection.to_string(), id),
785        "node" => RefTarget::node(collection.to_string(), EntityId::new(id)),
786        "edge" => RefTarget::Edge {
787            collection: collection.to_string(),
788            edge_id: EntityId::new(id),
789        },
790        "vector" => RefTarget::vector(collection.to_string(), EntityId::new(id)),
791        "entity" => RefTarget::Entity {
792            collection: collection.to_string(),
793            entity_id: EntityId::new(id),
794        },
795        _ => {
796            return Err(RedDBError::Query(format!(
797                "unsupported metadata reference kind '{kind}'"
798            )));
799        }
800    };
801
802    Ok(target)
803}
804
805fn parse_metadata_reference_value(value: &JsonValue) -> RedDBResult<RefTarget> {
806    let JsonValue::Object(object) = value else {
807        return Err(RedDBError::Query(
808            "metadata reference entries must be objects".to_string(),
809        ));
810    };
811    parse_metadata_reference(object)
812}
813
814fn parse_patch_u64_value(value: &JsonValue, field: &str) -> RedDBResult<u64> {
815    let Some(value) = value.as_f64() else {
816        return Err(RedDBError::Query(format!(
817            "field '{field}' must be a number"
818        )));
819    };
820    if value.is_sign_negative() {
821        return Err(RedDBError::Query(format!(
822            "field '{field}' cannot be negative"
823        )));
824    }
825    if value.fract().abs() > f64::EPSILON {
826        return Err(RedDBError::Query(format!(
827            "field '{field}' must be an integer"
828        )));
829    }
830    if value > u64::MAX as f64 {
831        return Err(RedDBError::Query(format!("field '{field}' is too large")));
832    }
833    Ok(value as u64)
834}
835
836fn parse_patch_f32_vector(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
837    let values = value
838        .as_array()
839        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
840    let mut out = Vec::with_capacity(values.len());
841    for value in values {
842        let number = value.as_f64().ok_or_else(|| {
843            RedDBError::Query(format!("field '{field}' must contain only numbers"))
844        })?;
845        out.push(number as f32);
846    }
847    if out.is_empty() {
848        return Err(RedDBError::Query(format!(
849            "field '{field}' cannot be empty"
850        )));
851    }
852    Ok(out)
853}
854
855fn parse_sparse_index_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<u32>> {
856    let values = value
857        .as_array()
858        .ok_or_else(|| RedDBError::Query(format!("field '{field}' must be an array")))?;
859    let mut out = Vec::with_capacity(values.len());
860    for value in values {
861        let value = value.as_f64().ok_or_else(|| {
862            RedDBError::Query(format!("field '{field}' must contain only integers"))
863        })?;
864        if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
865            return Err(RedDBError::Query(format!(
866                "field '{field}' must contain only u32 values"
867            )));
868        }
869        if value > u32::MAX as f64 {
870            return Err(RedDBError::Query(format!(
871                "field '{field}' value is too large"
872            )));
873        }
874        out.push(value as u32);
875    }
876    Ok(out)
877}
878
879fn parse_sparse_value_array(value: &JsonValue, field: &str) -> RedDBResult<Vec<f32>> {
880    parse_patch_f32_vector(value, field)
881}
882
883fn parse_sparse_vector_value(value: &JsonValue) -> RedDBResult<Option<SparseVector>> {
884    match value {
885        JsonValue::Null => Ok(None),
886        JsonValue::Object(object) => {
887            let indices = parse_sparse_index_array(
888                object.get("indices").ok_or_else(|| {
889                    RedDBError::Query("sparse metadata requires 'indices'".to_string())
890                })?,
891                "sparse.indices",
892            )?;
893            let values = parse_sparse_value_array(
894                object.get("values").ok_or_else(|| {
895                    RedDBError::Query("sparse metadata requires 'values'".to_string())
896                })?,
897                "sparse.values",
898            )?;
899            if indices.len() != values.len() {
900                return Err(RedDBError::Query(
901                    "sparse indices and values lengths must match".to_string(),
902                ));
903            }
904            let dimension = match object.get("dimension").and_then(JsonValue::as_f64) {
905                Some(value) => {
906                    if value.is_sign_negative() || value.fract().abs() > f64::EPSILON {
907                        return Err(RedDBError::Query(
908                            "sparse dimension must be a non-negative integer".to_string(),
909                        ));
910                    }
911                    if value > usize::MAX as f64 {
912                        return Err(RedDBError::Query(
913                            "sparse dimension is too large".to_string(),
914                        ));
915                    }
916                    value as usize
917                }
918                None => indices
919                    .iter()
920                    .max()
921                    .map_or(0, |index| (*index as usize) + 1),
922            };
923            if indices.iter().any(|index| (*index as usize) >= dimension) {
924                return Err(RedDBError::Query(
925                    "sparse indices must be smaller than dimension".to_string(),
926                ));
927            }
928            Ok(Some(SparseVector::new(indices, values, dimension)))
929        }
930        _ => Err(RedDBError::Query(
931            "field 'sparse' must be an object or null".to_string(),
932        )),
933    }
934}
935
936fn apply_patch_json_set(
937    target: &mut JsonValue,
938    path: &[String],
939    value: JsonValue,
940) -> Result<(), String> {
941    if path.is_empty() {
942        return Err("patch path cannot be empty".to_string());
943    }
944
945    let mut current = target;
946    for segment in &path[..path.len() - 1] {
947        let JsonValue::Object(object) = current else {
948            return Err("patch path target must be an object".to_string());
949        };
950        let value = object
951            .entry(segment.clone())
952            .or_insert_with(|| JsonValue::Object(Map::new()));
953        if !matches!(value, JsonValue::Object(_)) {
954            *value = JsonValue::Object(Map::new());
955        }
956        current = value;
957    }
958
959    let JsonValue::Object(object) = current else {
960        return Err("patch path target must be an object".to_string());
961    };
962    object.insert(path[path.len() - 1].clone(), value);
963    Ok(())
964}
965
966fn apply_patch_json_unset(target: &mut JsonValue, path: &[String]) -> Result<(), String> {
967    if path.is_empty() {
968        return Err("patch path cannot be empty".to_string());
969    }
970
971    if path.len() == 1 {
972        let JsonValue::Object(object) = target else {
973            return Err("patch path target must be an object".to_string());
974        };
975        object.remove(&path[0]);
976        return Ok(());
977    }
978
979    let mut current = target;
980    for segment in &path[..path.len() - 1] {
981        let Some(value) = (match current {
982            JsonValue::Object(object) => object.get_mut(segment),
983            _ => {
984                return Err("patch path target must be an object".to_string());
985            }
986        }) else {
987            return Ok(());
988        };
989
990        if !matches!(value, JsonValue::Object(_)) {
991            return Ok(());
992        }
993        current = value;
994    }
995
996    let JsonValue::Object(object) = current else {
997        return Err("patch path target must be an object".to_string());
998    };
999    object.remove(&path[path.len() - 1]);
1000    Ok(())
1001}
1002
1003fn format_mac(bytes: &[u8; 6]) -> String {
1004    bytes
1005        .iter()
1006        .map(|byte| format!("{byte:02x}"))
1007        .collect::<Vec<_>>()
1008        .join(":")
1009}
1010
1011#[cfg(test)]
1012mod damage_vector_tests {
1013    use super::*;
1014
1015    fn s(n: &str) -> String {
1016        n.to_string()
1017    }
1018
1019    #[test]
1020    fn identical_rows_produce_empty_vector() {
1021        let old = vec![
1022            (s("name"), Value::text(s("alice"))),
1023            (s("age"), Value::Integer(30)),
1024        ];
1025        let new = old.clone();
1026        let dv = row_damage_vector(&old, &new);
1027        assert!(dv.is_empty());
1028        assert!(dv.touched_columns().is_empty());
1029    }
1030
1031    #[test]
1032    fn detects_changed_column_only() {
1033        let old = vec![
1034            (s("name"), Value::text(s("alice"))),
1035            (s("age"), Value::Integer(30)),
1036        ];
1037        let new = vec![
1038            (s("name"), Value::text(s("alice"))),
1039            (s("age"), Value::Integer(31)),
1040        ];
1041        let dv = row_damage_vector(&old, &new);
1042        assert_eq!(dv.changed.len(), 1);
1043        assert_eq!(dv.changed[0].0, "age");
1044        assert!(dv.added.is_empty());
1045        assert!(dv.removed.is_empty());
1046        assert_eq!(dv.touched_columns(), vec!["age"]);
1047    }
1048
1049    #[test]
1050    fn detects_added_and_removed_columns() {
1051        let old = vec![
1052            (s("name"), Value::text(s("alice"))),
1053            (s("nickname"), Value::text(s("al"))),
1054        ];
1055        let new = vec![
1056            (s("name"), Value::text(s("alice"))),
1057            (s("email"), Value::text(s("a@x.com"))),
1058        ];
1059        let dv = row_damage_vector(&old, &new);
1060        assert!(dv.changed.is_empty());
1061        assert_eq!(dv.added.len(), 1);
1062        assert_eq!(dv.added[0].0, "email");
1063        assert_eq!(dv.removed.len(), 1);
1064        assert_eq!(dv.removed[0].0, "nickname");
1065    }
1066
1067    #[test]
1068    fn field_order_does_not_affect_diff() {
1069        // `(name, age)` and `(age, name)` with identical values should
1070        // diff as empty — column order is a presentation concern.
1071        let old = vec![
1072            (s("name"), Value::text(s("bob"))),
1073            (s("age"), Value::Integer(42)),
1074        ];
1075        let new = vec![
1076            (s("age"), Value::Integer(42)),
1077            (s("name"), Value::text(s("bob"))),
1078        ];
1079        assert!(row_damage_vector(&old, &new).is_empty());
1080    }
1081
1082    #[test]
1083    fn mixed_changed_added_removed() {
1084        let old = vec![
1085            (s("a"), Value::Integer(1)),
1086            (s("b"), Value::Integer(2)),
1087            (s("gone"), Value::text(s("x"))),
1088        ];
1089        let new = vec![
1090            (s("a"), Value::Integer(10)),     // changed
1091            (s("b"), Value::Integer(2)),      // unchanged
1092            (s("new"), Value::Boolean(true)), // added
1093        ];
1094        let dv = row_damage_vector(&old, &new);
1095        assert_eq!(dv.changed.len(), 1);
1096        assert_eq!(dv.added.len(), 1);
1097        assert_eq!(dv.removed.len(), 1);
1098        let mut touched: Vec<&str> = dv.touched_columns();
1099        touched.sort();
1100        assert_eq!(touched, vec!["a", "gone", "new"]);
1101    }
1102}