Skip to main content

kora_doc/
decompose.rs

1//! JSON-to-packed-document decomposition pipeline.
2//!
3//! [`Decomposer`] walks a `serde_json::Value` tree depth-first, flattening
4//! nested objects into dot-separated field paths (e.g., `address.city`). Each
5//! leaf field is:
6//!
7//! 1. Assigned a compact [`FieldId`](crate::registry::FieldId) via the
8//!    [`IdRegistry`](crate::registry::IdRegistry), creating the mapping on
9//!    first encounter.
10//! 2. Optionally dictionary-encoded through the collection's
11//!    [`ValueDictionary`](crate::dictionary::ValueDictionary) when the field's
12//!    observed cardinality is low and the value meets the minimum length
13//!    threshold.
14//! 3. Emitted as a [`FieldValue`](crate::packed::FieldValue) into a
15//!    [`PackedDocBuilder`](crate::packed::PackedDocBuilder).
16//!
17//! Arrays and empty objects are stored as opaque JSON byte payloads
18//! (`FieldValue::ArrayBytes`) to preserve exact round-trip fidelity. Field
19//! names containing dots are rejected because dots serve as the path
20//! separator in the flattened schema.
21
22use serde_json::Value;
23use thiserror::Error;
24
25use crate::dictionary::{DictionaryError, StoredValue, ValueDictionary};
26use crate::packed::{FieldValue, PackedDoc, PackedDocBuilder, PackedDocError};
27use crate::registry::{CollectionId, IdRegistry, RegistryError};
28
29/// Errors returned when decomposing JSON into a packed document.
30#[derive(Debug, Error)]
31pub enum DecomposeError {
32    /// Root documents must be JSON objects.
33    #[error("root document must be a JSON object")]
34    RootMustBeObject,
35    /// Field names cannot be empty or contain reserved separator characters.
36    #[error("invalid field name '{field_name}' at path '{path}'")]
37    InvalidFieldName {
38        /// Invalid field name.
39        field_name: String,
40        /// Parent path where the field appeared.
41        path: String,
42    },
43    /// Numbers that cannot be represented in packed format are rejected.
44    #[error("number at path '{0}' is not representable as i64/f64")]
45    UnsupportedNumber(String),
46    /// Registry operation failed.
47    #[error(transparent)]
48    Registry(#[from] RegistryError),
49    /// Dictionary operation failed.
50    #[error(transparent)]
51    Dictionary(#[from] DictionaryError),
52    /// Packed document encoding failed.
53    #[error(transparent)]
54    Packed(#[from] PackedDocError),
55    /// Structured value payload serialization failed.
56    #[error("failed to serialize structured value for path '{path}': {message}")]
57    StructuredValueEncode {
58        /// Dotted field path.
59        path: String,
60        /// serde_json error string.
61        message: String,
62    },
63}
64
65/// JSON decomposer for one collection.
66pub struct Decomposer<'a> {
67    collection_id: CollectionId,
68    registry: &'a mut IdRegistry,
69    dictionary: &'a mut ValueDictionary,
70    packed_version: u16,
71}
72
73impl<'a> Decomposer<'a> {
74    /// Create a decomposer for one collection.
75    #[must_use]
76    pub fn new(
77        collection_id: CollectionId,
78        registry: &'a mut IdRegistry,
79        dictionary: &'a mut ValueDictionary,
80        packed_version: u16,
81    ) -> Self {
82        Self {
83            collection_id,
84            registry,
85            dictionary,
86            packed_version,
87        }
88    }
89
90    /// Decompose one JSON object into `PackedDoc`.
91    pub fn decompose(
92        &mut self,
93        json: &Value,
94        updated_at: u32,
95    ) -> Result<PackedDoc, DecomposeError> {
96        let Value::Object(map) = json else {
97            return Err(DecomposeError::RootMustBeObject);
98        };
99
100        let mut builder = PackedDocBuilder::new(self.packed_version);
101        for (key, value) in map {
102            validate_field_name("", key)?;
103            self.walk(key, value, &mut builder)?;
104        }
105        builder.build(updated_at).map_err(DecomposeError::from)
106    }
107
108    fn walk(
109        &mut self,
110        path: &str,
111        value: &Value,
112        builder: &mut PackedDocBuilder,
113    ) -> Result<(), DecomposeError> {
114        match value {
115            Value::Object(map) => {
116                if map.is_empty() {
117                    let field_id = self
118                        .registry
119                        .get_or_create_field_id(self.collection_id, path)?;
120                    builder.add_field(field_id, FieldValue::ArrayBytes(b"{}".to_vec()))?;
121                    return Ok(());
122                }
123
124                for (key, nested_value) in map {
125                    validate_field_name(path, key)?;
126                    let nested_path = join_path(path, key);
127                    self.walk(&nested_path, nested_value, builder)?;
128                }
129                Ok(())
130            }
131            Value::Array(_) => {
132                let field_id = self
133                    .registry
134                    .get_or_create_field_id(self.collection_id, path)?;
135                let payload = serde_json::to_vec(value).map_err(|err| {
136                    DecomposeError::StructuredValueEncode {
137                        path: path.to_string(),
138                        message: err.to_string(),
139                    }
140                })?;
141                builder.add_field(field_id, FieldValue::ArrayBytes(payload))?;
142                Ok(())
143            }
144            Value::String(string) => {
145                let field_id = self
146                    .registry
147                    .get_or_create_field_id(self.collection_id, path)?;
148                let stored = self.dictionary.encode(field_id, string.as_bytes())?;
149                let encoded = match stored {
150                    StoredValue::DictRef(id) => FieldValue::DictRef(id),
151                    StoredValue::Inline(bytes) => FieldValue::InlineBytes(bytes),
152                };
153                builder.add_field(field_id, encoded)?;
154                Ok(())
155            }
156            Value::Number(number) => {
157                let field_id = self
158                    .registry
159                    .get_or_create_field_id(self.collection_id, path)?;
160                if let Some(int) = number.as_i64() {
161                    builder.add_field(field_id, FieldValue::I64(int))?;
162                    return Ok(());
163                }
164                if let Some(float) = number.as_f64() {
165                    builder.add_field(field_id, FieldValue::F64(float))?;
166                    return Ok(());
167                }
168                Err(DecomposeError::UnsupportedNumber(path.to_string()))
169            }
170            Value::Bool(value) => {
171                let field_id = self
172                    .registry
173                    .get_or_create_field_id(self.collection_id, path)?;
174                builder.add_field(field_id, FieldValue::Bool(*value))?;
175                Ok(())
176            }
177            Value::Null => {
178                let field_id = self
179                    .registry
180                    .get_or_create_field_id(self.collection_id, path)?;
181                builder.add_field(field_id, FieldValue::Null)?;
182                Ok(())
183            }
184        }
185    }
186}
187
188fn join_path(parent: &str, child: &str) -> String {
189    format!("{parent}.{child}")
190}
191
192fn validate_field_name(path: &str, field_name: &str) -> Result<(), DecomposeError> {
193    if field_name.is_empty() || field_name.contains('.') {
194        return Err(DecomposeError::InvalidFieldName {
195            field_name: field_name.to_string(),
196            path: path.to_string(),
197        });
198    }
199    Ok(())
200}
201
202#[cfg(test)]
203mod tests {
204    use serde_json::json;
205
206    use crate::dictionary::ValueDictionaryConfig;
207    use crate::recompose::Recomposer;
208
209    use super::*;
210
211    #[test]
212    fn decomposes_and_recomposes_nested_document() {
213        let mut registry = IdRegistry::new();
214        let mut dictionary = ValueDictionary::new(ValueDictionaryConfig {
215            low_cardinality_threshold: 1_000,
216            min_len_for_dictionary: 2,
217        });
218        let collection_id = registry
219            .get_or_create_collection_id("users")
220            .expect("collection id should allocate");
221
222        let mut decomposer = Decomposer::new(collection_id, &mut registry, &mut dictionary, 1);
223        let source = json!({
224            "name": "Augustus",
225            "active": true,
226            "address": {"city": "Accra"},
227            "tags": ["rust", "systems"]
228        });
229
230        let packed = decomposer
231            .decompose(&source, 42)
232            .expect("decomposition should work");
233        let recomposed = Recomposer::recompose(&packed, &registry, &dictionary, collection_id)
234            .expect("recompose should work");
235        assert_eq!(recomposed, source);
236    }
237
238    #[test]
239    fn rejects_non_object_root() {
240        let mut registry = IdRegistry::new();
241        let mut dictionary = ValueDictionary::default();
242        let collection_id = registry
243            .get_or_create_collection_id("users")
244            .expect("collection id should allocate");
245        let mut decomposer = Decomposer::new(collection_id, &mut registry, &mut dictionary, 1);
246
247        let err = decomposer
248            .decompose(&json!("string"), 0)
249            .expect_err("non-object root should fail");
250        assert!(matches!(err, DecomposeError::RootMustBeObject));
251    }
252
253    #[test]
254    fn rejects_field_names_with_dots() {
255        let mut registry = IdRegistry::new();
256        let mut dictionary = ValueDictionary::default();
257        let collection_id = registry
258            .get_or_create_collection_id("users")
259            .expect("collection id should allocate");
260        let mut decomposer = Decomposer::new(collection_id, &mut registry, &mut dictionary, 1);
261
262        let err = decomposer
263            .decompose(&json!({ "address.city": "Accra" }), 0)
264            .expect_err("dotted field names are ambiguous and must fail");
265
266        assert!(matches!(
267            err,
268            DecomposeError::InvalidFieldName { field_name, .. } if field_name == "address.city"
269        ));
270    }
271}