Skip to main content

reddb_server/runtime/
impl_tree.rs

1//! Tree management over graph collections.
2
3use crate::application::entity::json_to_metadata_value;
4use crate::application::ports::RuntimeEntityPort;
5use crate::storage::query::ast::{
6    CreateTreeQuery, DropTreeQuery, TreeCommand, TreeNodeSpec, TreePosition,
7};
8use crate::storage::unified::{Metadata, MetadataValue};
9
10use super::*;
11
12const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
13const TREE_METADATA_PREFIX: &str = "red.tree.";
14const TREE_METADATA_NAME: &str = "red.tree.name";
15const TREE_METADATA_MAX_CHILDREN: &str = "red.tree.max_children";
16const TREE_METADATA_CHILD_INDEX: &str = "red.tree.child_index";
17const TREE_OWNERSHIP_OWNED: &str = "owned";
18const TREE_AUTO_FIX_CONSERVATIVE: &str = "conservative";
19
20#[derive(Debug, Clone)]
21struct TreeIssue {
22    code: &'static str,
23    message: String,
24    entity_id: Option<EntityId>,
25}
26
27#[derive(Debug, Clone)]
28struct TreeNodeState {
29    metadata: Metadata,
30    max_children_override: Option<usize>,
31}
32
33#[derive(Debug, Clone)]
34struct TreeChildLink {
35    edge_id: EntityId,
36    parent_id: EntityId,
37    child_id: EntityId,
38    child_index: usize,
39}
40
41#[derive(Debug, Clone)]
42struct TreeState {
43    nodes: BTreeMap<EntityId, TreeNodeState>,
44    structural_edges: BTreeMap<EntityId, TreeChildLink>,
45    children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>>,
46    parent_by_child: BTreeMap<EntityId, TreeChildLink>,
47}
48
49#[derive(Debug, Clone)]
50struct TreeValidation {
51    issues: Vec<TreeIssue>,
52    depths: BTreeMap<EntityId, usize>,
53}
54
55#[derive(Debug, Clone)]
56struct TreeRebalancePlanRow {
57    node_id: EntityId,
58    old_parent_id: EntityId,
59    new_parent_id: EntityId,
60    old_index: usize,
61    new_index: usize,
62    old_depth: usize,
63    new_depth: usize,
64    changed: bool,
65}
66
67impl RedDBRuntime {
68    pub fn execute_create_tree(
69        &self,
70        raw_query: &str,
71        query: &CreateTreeQuery,
72    ) -> RedDBResult<RuntimeQueryResult> {
73        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
74        if query.default_max_children == 0 {
75            return Err(RedDBError::Query(
76                "tree default max children must be positive".to_string(),
77            ));
78        }
79
80        if self
81            .inner
82            .db
83            .tree_definition(&query.collection, &query.name)
84            .is_some()
85        {
86            if query.if_not_exists {
87                return Ok(RuntimeQueryResult::ok_message(
88                    raw_query.to_string(),
89                    &format!("tree '{}.{}' already exists", query.collection, query.name),
90                    "create",
91                ));
92            }
93            return Err(RedDBError::Query(format!(
94                "tree '{}.{}' already exists",
95                query.collection, query.name
96            )));
97        }
98
99        let root = self.create_node_unchecked(self.build_tree_create_node_input(
100            &query.collection,
101            &query.name,
102            &query.root,
103            false,
104        )?)?;
105        self.ensure_tree_root_metadata(
106            &query.collection,
107            root.id,
108            &query.name,
109            query.root.max_children,
110        )?;
111        let now = current_tree_unix_ms();
112        let definition = crate::physical::PhysicalTreeDefinition {
113            collection: query.collection.clone(),
114            name: query.name.clone(),
115            root_id: root.id.raw(),
116            default_max_children: query.default_max_children,
117            ordered_children: true,
118            ownership: TREE_OWNERSHIP_OWNED.to_string(),
119            auto_fix_mode: TREE_AUTO_FIX_CONSERVATIVE.to_string(),
120            created_at_unix_ms: now,
121            updated_at_unix_ms: now,
122        };
123        self.inner
124            .db
125            .save_tree_definition(definition)
126            .map_err(|err| RedDBError::Internal(err.to_string()))?;
127        self.inner
128            .db
129            .persist_metadata()
130            .map_err(|err| RedDBError::Internal(err.to_string()))?;
131        self.note_table_write(&query.collection);
132        // Issue #120 — surface the tree name in the schema-vocabulary,
133        // scoped to its parent collection. Trees behave like an
134        // index in the lookup model.
135        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
136            collection: query.collection.clone(),
137            index: query.name.clone(),
138            columns: Vec::new(),
139        });
140
141        Ok(RuntimeQueryResult::ok_records(
142            raw_query.to_string(),
143            vec![
144                "collection".to_string(),
145                "tree_name".to_string(),
146                "root_id".to_string(),
147                "default_max_children".to_string(),
148            ],
149            vec![vec![
150                (
151                    "collection".to_string(),
152                    Value::text(query.collection.clone()),
153                ),
154                ("tree_name".to_string(), Value::text(query.name.clone())),
155                ("root_id".to_string(), Value::UnsignedInteger(root.id.raw())),
156                (
157                    "default_max_children".to_string(),
158                    Value::UnsignedInteger(query.default_max_children as u64),
159                ),
160            ]],
161            "create",
162        ))
163    }
164
165    pub fn execute_drop_tree(
166        &self,
167        raw_query: &str,
168        query: &DropTreeQuery,
169    ) -> RedDBResult<RuntimeQueryResult> {
170        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
171        let Some(definition) = self
172            .inner
173            .db
174            .tree_definition(&query.collection, &query.name)
175        else {
176            if query.if_exists {
177                return Ok(RuntimeQueryResult::ok_message(
178                    raw_query.to_string(),
179                    &format!("tree '{}.{}' does not exist", query.collection, query.name),
180                    "drop",
181                ));
182            }
183            return Err(RedDBError::NotFound(format!(
184                "tree '{}.{}' not found",
185                query.collection, query.name
186            )));
187        };
188
189        let state = self.load_tree_state(&definition)?;
190        let node_ids: BTreeSet<EntityId> = state.nodes.keys().copied().collect();
191        let edge_ids = self.graph_edges_touching_nodes(&definition.collection, &node_ids)?;
192
193        for edge_id in edge_ids {
194            let _ = self.delete_entity_internal(&definition.collection, edge_id)?;
195        }
196
197        let mut ordered_nodes: Vec<EntityId> = node_ids.into_iter().collect();
198        ordered_nodes.sort_by(|left, right| right.cmp(left));
199        for node_id in ordered_nodes {
200            let _ = self.delete_entity_internal(&definition.collection, node_id)?;
201        }
202
203        self.inner
204            .db
205            .remove_tree_definition(&query.collection, &query.name)
206            .map_err(|err| RedDBError::Internal(err.to_string()))?;
207        self.inner
208            .db
209            .persist_metadata()
210            .map_err(|err| RedDBError::Internal(err.to_string()))?;
211        self.note_table_write(&query.collection);
212        // Issue #120 — invalidate the schema-vocabulary tree entry.
213        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
214            collection: query.collection.clone(),
215            index: query.name.clone(),
216        });
217
218        Ok(RuntimeQueryResult::ok_message(
219            raw_query.to_string(),
220            &format!("tree '{}.{}' dropped", query.collection, query.name),
221            "drop",
222        ))
223    }
224
225    pub fn execute_tree_command(
226        &self,
227        raw_query: &str,
228        command: &TreeCommand,
229    ) -> RedDBResult<RuntimeQueryResult> {
230        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
231        match command {
232            TreeCommand::Insert {
233                collection,
234                tree_name,
235                parent_id,
236                node,
237                position,
238            } => self.execute_tree_insert(
239                raw_query,
240                collection,
241                tree_name,
242                EntityId::new(*parent_id),
243                node,
244                *position,
245            ),
246            TreeCommand::Move {
247                collection,
248                tree_name,
249                node_id,
250                parent_id,
251                position,
252            } => self.execute_tree_move(
253                raw_query,
254                collection,
255                tree_name,
256                EntityId::new(*node_id),
257                EntityId::new(*parent_id),
258                *position,
259            ),
260            TreeCommand::Delete {
261                collection,
262                tree_name,
263                node_id,
264            } => {
265                self.execute_tree_delete(raw_query, collection, tree_name, EntityId::new(*node_id))
266            }
267            TreeCommand::Validate {
268                collection,
269                tree_name,
270            } => self.execute_tree_validate(raw_query, collection, tree_name),
271            TreeCommand::Rebalance {
272                collection,
273                tree_name,
274                dry_run,
275            } => self.execute_tree_rebalance(raw_query, collection, tree_name, *dry_run),
276        }
277    }
278
279    fn execute_tree_insert(
280        &self,
281        raw_query: &str,
282        collection: &str,
283        tree_name: &str,
284        parent_id: EntityId,
285        node: &TreeNodeSpec,
286        position: TreePosition,
287    ) -> RedDBResult<RuntimeQueryResult> {
288        let definition = self.require_tree_definition(collection, tree_name)?;
289        let state = self.load_tree_state(&definition)?;
290        self.ensure_tree_operable(&definition, &state)?;
291
292        if !state.nodes.contains_key(&parent_id) {
293            return Err(RedDBError::NotFound(format!(
294                "parent node '{}' was not found in tree '{}.{}'",
295                parent_id.raw(),
296                collection,
297                tree_name
298            )));
299        }
300
301        let mut children = child_id_list(&state.children_by_parent, parent_id);
302        let insert_index = resolve_tree_insert_position(position, children.len())?;
303        let parent_limit = effective_max_children(&definition, &state, parent_id)?;
304        if children.len() >= parent_limit {
305            return Err(RedDBError::Query(format!(
306                "parent node '{}' is at capacity ({}) in tree '{}.{}'",
307                parent_id.raw(),
308                parent_limit,
309                collection,
310                tree_name
311            )));
312        }
313
314        let created = self.create_node_unchecked(
315            self.build_tree_create_node_input(collection, tree_name, node, true)?,
316        )?;
317        children.insert(insert_index, created.id);
318        self.rewrite_parent_children(collection, tree_name, parent_id, &children, &state)?;
319        self.touch_tree_definition_timestamp(&definition)?;
320        self.note_table_write(collection);
321
322        Ok(RuntimeQueryResult::ok_records(
323            raw_query.to_string(),
324            vec![
325                "collection".to_string(),
326                "tree_name".to_string(),
327                "node_id".to_string(),
328                "parent_id".to_string(),
329                "child_index".to_string(),
330            ],
331            vec![vec![
332                (
333                    "collection".to_string(),
334                    Value::text(collection.to_string()),
335                ),
336                ("tree_name".to_string(), Value::text(tree_name.to_string())),
337                (
338                    "node_id".to_string(),
339                    Value::UnsignedInteger(created.id.raw()),
340                ),
341                (
342                    "parent_id".to_string(),
343                    Value::UnsignedInteger(parent_id.raw()),
344                ),
345                (
346                    "child_index".to_string(),
347                    Value::UnsignedInteger(insert_index as u64),
348                ),
349            ]],
350            "insert",
351        ))
352    }
353
354    fn execute_tree_move(
355        &self,
356        raw_query: &str,
357        collection: &str,
358        tree_name: &str,
359        node_id: EntityId,
360        new_parent_id: EntityId,
361        position: TreePosition,
362    ) -> RedDBResult<RuntimeQueryResult> {
363        let definition = self.require_tree_definition(collection, tree_name)?;
364        let state = self.load_tree_state(&definition)?;
365        self.ensure_tree_operable(&definition, &state)?;
366
367        if node_id.raw() == definition.root_id {
368            return Err(RedDBError::Query("cannot move the tree root".to_string()));
369        }
370        if !state.nodes.contains_key(&node_id) {
371            return Err(RedDBError::NotFound(format!(
372                "node '{}' was not found in tree '{}.{}'",
373                node_id.raw(),
374                collection,
375                tree_name
376            )));
377        }
378        if !state.nodes.contains_key(&new_parent_id) {
379            return Err(RedDBError::NotFound(format!(
380                "parent node '{}' was not found in tree '{}.{}'",
381                new_parent_id.raw(),
382                collection,
383                tree_name
384            )));
385        }
386        if node_id == new_parent_id {
387            return Err(RedDBError::Query(
388                "node cannot be moved under itself".to_string(),
389            ));
390        }
391        if subtree_ids(&state, node_id).contains(&new_parent_id) {
392            return Err(RedDBError::Query(
393                "node cannot be moved under one of its descendants".to_string(),
394            ));
395        }
396
397        let current_parent = state
398            .parent_by_child
399            .get(&node_id)
400            .map(|link| link.parent_id)
401            .ok_or_else(|| {
402                RedDBError::Query(format!(
403                    "node '{}' does not have a structural parent in tree '{}.{}'",
404                    node_id.raw(),
405                    collection,
406                    tree_name
407                ))
408            })?;
409
410        let mut old_parent_children = child_id_list(&state.children_by_parent, current_parent);
411        old_parent_children.retain(|child| *child != node_id);
412
413        let mut new_parent_children = if current_parent == new_parent_id {
414            old_parent_children.clone()
415        } else {
416            child_id_list(&state.children_by_parent, new_parent_id)
417        };
418
419        let insert_index = resolve_tree_insert_position(position, new_parent_children.len())?;
420        let new_parent_limit = effective_max_children(&definition, &state, new_parent_id)?;
421        if current_parent != new_parent_id && new_parent_children.len() >= new_parent_limit {
422            return Err(RedDBError::Query(format!(
423                "parent node '{}' is at capacity ({}) in tree '{}.{}'",
424                new_parent_id.raw(),
425                new_parent_limit,
426                collection,
427                tree_name
428            )));
429        }
430
431        new_parent_children.insert(insert_index, node_id);
432
433        if current_parent == new_parent_id {
434            self.rewrite_parent_children(
435                collection,
436                tree_name,
437                current_parent,
438                &new_parent_children,
439                &state,
440            )?;
441        } else {
442            self.rewrite_parent_children(
443                collection,
444                tree_name,
445                current_parent,
446                &old_parent_children,
447                &state,
448            )?;
449            self.rewrite_parent_children(
450                collection,
451                tree_name,
452                new_parent_id,
453                &new_parent_children,
454                &state,
455            )?;
456        }
457
458        self.touch_tree_definition_timestamp(&definition)?;
459        self.note_table_write(collection);
460
461        Ok(RuntimeQueryResult::ok_records(
462            raw_query.to_string(),
463            vec![
464                "collection".to_string(),
465                "tree_name".to_string(),
466                "node_id".to_string(),
467                "old_parent_id".to_string(),
468                "new_parent_id".to_string(),
469                "child_index".to_string(),
470            ],
471            vec![vec![
472                (
473                    "collection".to_string(),
474                    Value::text(collection.to_string()),
475                ),
476                ("tree_name".to_string(), Value::text(tree_name.to_string())),
477                ("node_id".to_string(), Value::UnsignedInteger(node_id.raw())),
478                (
479                    "old_parent_id".to_string(),
480                    Value::UnsignedInteger(current_parent.raw()),
481                ),
482                (
483                    "new_parent_id".to_string(),
484                    Value::UnsignedInteger(new_parent_id.raw()),
485                ),
486                (
487                    "child_index".to_string(),
488                    Value::UnsignedInteger(insert_index as u64),
489                ),
490            ]],
491            "update",
492        ))
493    }
494
495    fn execute_tree_delete(
496        &self,
497        raw_query: &str,
498        collection: &str,
499        tree_name: &str,
500        node_id: EntityId,
501    ) -> RedDBResult<RuntimeQueryResult> {
502        let definition = self.require_tree_definition(collection, tree_name)?;
503        let state = self.load_tree_state(&definition)?;
504        self.ensure_tree_operable(&definition, &state)?;
505
506        if !state.nodes.contains_key(&node_id) {
507            return Err(RedDBError::NotFound(format!(
508                "node '{}' was not found in tree '{}.{}'",
509                node_id.raw(),
510                collection,
511                tree_name
512            )));
513        }
514
515        if node_id.raw() == definition.root_id {
516            return self.execute_drop_tree(
517                raw_query,
518                &DropTreeQuery {
519                    collection: collection.to_string(),
520                    name: tree_name.to_string(),
521                    if_exists: false,
522                },
523            );
524        }
525
526        let subtree = subtree_ids(&state, node_id);
527        let parent_id = state
528            .parent_by_child
529            .get(&node_id)
530            .map(|link| link.parent_id)
531            .ok_or_else(|| {
532                RedDBError::Query(format!(
533                    "node '{}' does not have a structural parent in tree '{}.{}'",
534                    node_id.raw(),
535                    collection,
536                    tree_name
537                ))
538            })?;
539        let surviving_children: Vec<EntityId> = child_id_list(&state.children_by_parent, parent_id)
540            .into_iter()
541            .filter(|child| *child != node_id)
542            .collect();
543
544        let edge_ids = self.graph_edges_touching_nodes(collection, &subtree)?;
545        for edge_id in edge_ids {
546            let _ = self.delete_entity_internal(collection, edge_id)?;
547        }
548
549        let mut ordered_nodes: Vec<EntityId> = subtree.iter().copied().collect();
550        ordered_nodes.sort_by(|left, right| right.cmp(left));
551        for subtree_node in ordered_nodes {
552            let _ = self.delete_entity_internal(collection, subtree_node)?;
553        }
554
555        self.rewrite_parent_children(
556            collection,
557            tree_name,
558            parent_id,
559            &surviving_children,
560            &state,
561        )?;
562        self.touch_tree_definition_timestamp(&definition)?;
563        self.note_table_write(collection);
564
565        Ok(RuntimeQueryResult::ok_records(
566            raw_query.to_string(),
567            vec![
568                "collection".to_string(),
569                "tree_name".to_string(),
570                "deleted_root_id".to_string(),
571                "deleted_node_count".to_string(),
572            ],
573            vec![vec![
574                (
575                    "collection".to_string(),
576                    Value::text(collection.to_string()),
577                ),
578                ("tree_name".to_string(), Value::text(tree_name.to_string())),
579                (
580                    "deleted_root_id".to_string(),
581                    Value::UnsignedInteger(node_id.raw()),
582                ),
583                (
584                    "deleted_node_count".to_string(),
585                    Value::UnsignedInteger(subtree.len() as u64),
586                ),
587            ]],
588            "delete",
589        ))
590    }
591
592    fn execute_tree_validate(
593        &self,
594        raw_query: &str,
595        collection: &str,
596        tree_name: &str,
597    ) -> RedDBResult<RuntimeQueryResult> {
598        let definition = self.require_tree_definition(collection, tree_name)?;
599        let state = self.load_tree_state(&definition)?;
600        let validation = self.validate_tree_state(&definition, &state);
601
602        if validation.issues.is_empty() {
603            return Ok(RuntimeQueryResult::ok_records(
604                raw_query.to_string(),
605                vec![
606                    "ok".to_string(),
607                    "code".to_string(),
608                    "message".to_string(),
609                    "entity_id".to_string(),
610                ],
611                vec![vec![
612                    ("ok".to_string(), Value::Boolean(true)),
613                    ("code".to_string(), Value::text("ok".to_string())),
614                    (
615                        "message".to_string(),
616                        Value::text("tree is valid".to_string()),
617                    ),
618                    ("entity_id".to_string(), Value::Null),
619                ]],
620                "select",
621            ));
622        }
623
624        let rows = validation
625            .issues
626            .iter()
627            .map(|issue| {
628                vec![
629                    ("ok".to_string(), Value::Boolean(false)),
630                    ("code".to_string(), Value::text(issue.code.to_string())),
631                    ("message".to_string(), Value::text(issue.message.clone())),
632                    (
633                        "entity_id".to_string(),
634                        issue
635                            .entity_id
636                            .map(|entity_id| Value::UnsignedInteger(entity_id.raw()))
637                            .unwrap_or(Value::Null),
638                    ),
639                ]
640            })
641            .collect();
642        Ok(RuntimeQueryResult::ok_records(
643            raw_query.to_string(),
644            vec![
645                "ok".to_string(),
646                "code".to_string(),
647                "message".to_string(),
648                "entity_id".to_string(),
649            ],
650            rows,
651            "select",
652        ))
653    }
654
655    fn execute_tree_rebalance(
656        &self,
657        raw_query: &str,
658        collection: &str,
659        tree_name: &str,
660        dry_run: bool,
661    ) -> RedDBResult<RuntimeQueryResult> {
662        let definition = self.require_tree_definition(collection, tree_name)?;
663        let state = self.load_tree_state(&definition)?;
664        self.ensure_tree_operable(&definition, &state)?;
665        let plan = self.plan_tree_rebalance(&definition, &state)?;
666
667        if !dry_run {
668            let existing_edges: Vec<EntityId> = state.structural_edges.keys().copied().collect();
669            for edge_id in existing_edges {
670                let _ = self.delete_entity_internal(collection, edge_id)?;
671            }
672
673            for row in &plan {
674                self.create_edge_unchecked(self.build_tree_structural_edge_input(
675                    collection,
676                    tree_name,
677                    row.new_parent_id,
678                    row.node_id,
679                    row.new_index,
680                ))?;
681            }
682
683            self.touch_tree_definition_timestamp(&definition)?;
684            self.note_table_write(collection);
685        }
686
687        let rows = plan
688            .iter()
689            .map(|row| {
690                vec![
691                    (
692                        "node_id".to_string(),
693                        Value::UnsignedInteger(row.node_id.raw()),
694                    ),
695                    (
696                        "old_parent_id".to_string(),
697                        Value::UnsignedInteger(row.old_parent_id.raw()),
698                    ),
699                    (
700                        "new_parent_id".to_string(),
701                        Value::UnsignedInteger(row.new_parent_id.raw()),
702                    ),
703                    (
704                        "old_index".to_string(),
705                        Value::UnsignedInteger(row.old_index as u64),
706                    ),
707                    (
708                        "new_index".to_string(),
709                        Value::UnsignedInteger(row.new_index as u64),
710                    ),
711                    (
712                        "old_depth".to_string(),
713                        Value::UnsignedInteger(row.old_depth as u64),
714                    ),
715                    (
716                        "new_depth".to_string(),
717                        Value::UnsignedInteger(row.new_depth as u64),
718                    ),
719                    ("changed".to_string(), Value::Boolean(row.changed)),
720                ]
721            })
722            .collect();
723
724        Ok(RuntimeQueryResult::ok_records(
725            raw_query.to_string(),
726            vec![
727                "node_id".to_string(),
728                "old_parent_id".to_string(),
729                "new_parent_id".to_string(),
730                "old_index".to_string(),
731                "new_index".to_string(),
732                "old_depth".to_string(),
733                "new_depth".to_string(),
734                "changed".to_string(),
735            ],
736            rows,
737            "select",
738        ))
739    }
740
741    fn build_tree_create_node_input(
742        &self,
743        collection: &str,
744        tree_name: &str,
745        node: &TreeNodeSpec,
746        allow_max_children: bool,
747    ) -> RedDBResult<crate::application::CreateNodeInput> {
748        if allow_max_children && node.max_children == Some(0) {
749            return Err(RedDBError::Query(
750                "node max children must be positive".to_string(),
751            ));
752        }
753        self.ensure_tree_spec_is_user_safe(node)?;
754        let mut metadata = build_tree_node_metadata(tree_name, &node.metadata)?;
755        if allow_max_children {
756            if let Some(max_children) = node.max_children {
757                metadata.push((
758                    TREE_METADATA_MAX_CHILDREN.to_string(),
759                    MetadataValue::Int(max_children as i64),
760                ));
761            }
762        }
763        Ok(crate::application::CreateNodeInput {
764            collection: collection.to_string(),
765            label: node.label.clone(),
766            node_type: node.node_type.clone(),
767            properties: node.properties.clone(),
768            metadata,
769            embeddings: Vec::new(),
770            table_links: Vec::new(),
771            node_links: Vec::new(),
772        })
773    }
774
775    fn build_tree_structural_edge_input(
776        &self,
777        collection: &str,
778        tree_name: &str,
779        parent_id: EntityId,
780        child_id: EntityId,
781        child_index: usize,
782    ) -> crate::application::CreateEdgeInput {
783        crate::application::CreateEdgeInput {
784            collection: collection.to_string(),
785            label: TREE_CHILD_EDGE_LABEL.to_string(),
786            from: parent_id,
787            to: child_id,
788            weight: Some(1.0),
789            properties: Vec::new(),
790            metadata: vec![
791                (
792                    TREE_METADATA_NAME.to_string(),
793                    MetadataValue::String(tree_name.to_string()),
794                ),
795                (
796                    TREE_METADATA_CHILD_INDEX.to_string(),
797                    MetadataValue::Int(child_index as i64),
798                ),
799            ],
800        }
801    }
802
803    fn ensure_tree_spec_is_user_safe(&self, node: &TreeNodeSpec) -> RedDBResult<()> {
804        for (key, _) in &node.metadata {
805            if key.starts_with(TREE_METADATA_PREFIX) {
806                return Err(RedDBError::Query(format!(
807                    "metadata key '{}' is reserved for managed trees",
808                    key
809                )));
810            }
811        }
812        Ok(())
813    }
814
815    fn require_tree_definition(
816        &self,
817        collection: &str,
818        tree_name: &str,
819    ) -> RedDBResult<crate::physical::PhysicalTreeDefinition> {
820        self.inner
821            .db
822            .tree_definition(collection, tree_name)
823            .ok_or_else(|| {
824                RedDBError::NotFound(format!("tree '{}.{}' not found", collection, tree_name))
825            })
826    }
827
828    fn touch_tree_definition_timestamp(
829        &self,
830        definition: &crate::physical::PhysicalTreeDefinition,
831    ) -> RedDBResult<()> {
832        let mut updated = definition.clone();
833        updated.updated_at_unix_ms = current_tree_unix_ms();
834        self.inner
835            .db
836            .save_tree_definition(updated)
837            .map_err(|err| RedDBError::Internal(err.to_string()))?;
838        self.inner
839            .db
840            .persist_metadata()
841            .map_err(|err| RedDBError::Internal(err.to_string()))?;
842        Ok(())
843    }
844
845    fn ensure_tree_root_metadata(
846        &self,
847        collection: &str,
848        root_id: EntityId,
849        tree_name: &str,
850        max_children_override: Option<usize>,
851    ) -> RedDBResult<()> {
852        let store = self.inner.db.store();
853        let mut metadata = store.get_metadata(collection, root_id).unwrap_or_default();
854        metadata.set(
855            TREE_METADATA_NAME.to_string(),
856            MetadataValue::String(tree_name.to_string()),
857        );
858        match max_children_override {
859            Some(value) => metadata.set(
860                TREE_METADATA_MAX_CHILDREN.to_string(),
861                MetadataValue::Int(value as i64),
862            ),
863            None => {
864                metadata.remove(TREE_METADATA_MAX_CHILDREN);
865            }
866        }
867        store
868            .set_metadata(collection, root_id, metadata)
869            .map_err(|err| RedDBError::Internal(err.to_string()))
870    }
871
872    fn load_tree_state(
873        &self,
874        definition: &crate::physical::PhysicalTreeDefinition,
875    ) -> RedDBResult<TreeState> {
876        let store = self.inner.db.store();
877        let Some(manager) = store.get_collection(&definition.collection) else {
878            return Ok(TreeState {
879                nodes: BTreeMap::new(),
880                structural_edges: BTreeMap::new(),
881                children_by_parent: BTreeMap::new(),
882                parent_by_child: BTreeMap::new(),
883            });
884        };
885
886        let mut all_nodes = BTreeMap::new();
887        let mut candidate_links = Vec::new();
888        let mut member_node_ids = BTreeSet::new();
889        let root_id = EntityId::new(definition.root_id);
890
891        for entity in manager.query_all(|entity| {
892            matches!(
893                entity.kind,
894                EntityKind::GraphNode(_) | EntityKind::GraphEdge(_)
895            )
896        }) {
897            let metadata = store
898                .get_metadata(&definition.collection, entity.id)
899                .unwrap_or_default();
900            match &entity.kind {
901                EntityKind::GraphNode(_) => {
902                    if metadata_tree_name(&metadata) == Some(definition.name.as_str()) {
903                        member_node_ids.insert(entity.id);
904                    }
905                    all_nodes.insert(
906                        entity.id,
907                        TreeNodeState {
908                            max_children_override: metadata_usize(
909                                &metadata,
910                                TREE_METADATA_MAX_CHILDREN,
911                            ),
912                            metadata,
913                        },
914                    );
915                }
916                EntityKind::GraphEdge(edge_kind) if edge_kind.label == TREE_CHILD_EDGE_LABEL => {
917                    let Some(parent_id) = parse_tree_entity_id_text(&edge_kind.from_node) else {
918                        continue;
919                    };
920                    let Some(child_id) = parse_tree_entity_id_text(&edge_kind.to_node) else {
921                        continue;
922                    };
923                    if metadata_tree_name(&metadata) == Some(definition.name.as_str()) {
924                        member_node_ids.insert(parent_id);
925                        member_node_ids.insert(child_id);
926                    }
927                    let child_index =
928                        metadata_usize(&metadata, TREE_METADATA_CHILD_INDEX).unwrap_or(usize::MAX);
929                    candidate_links.push(TreeChildLink {
930                        edge_id: entity.id,
931                        parent_id,
932                        child_id,
933                        child_index,
934                    });
935                }
936                _ => {}
937            }
938        }
939
940        let mut candidate_children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>> =
941            BTreeMap::new();
942        for link in &candidate_links {
943            candidate_children_by_parent
944                .entry(link.parent_id)
945                .or_default()
946                .push(link.clone());
947        }
948
949        for links in candidate_children_by_parent.values_mut() {
950            links.sort_by(|left, right| {
951                left.child_index
952                    .cmp(&right.child_index)
953                    .then_with(|| left.edge_id.cmp(&right.edge_id))
954                    .then_with(|| left.child_id.cmp(&right.child_id))
955            });
956            for (derived_index, link) in links.iter_mut().enumerate() {
957                link.child_index = derived_index;
958            }
959        }
960
961        let mut queue = VecDeque::new();
962        queue.push_back(root_id);
963        while let Some(node_id) = queue.pop_front() {
964            if !member_node_ids.insert(node_id) {
965                continue;
966            }
967            if let Some(children) = candidate_children_by_parent.get(&node_id) {
968                for child in children {
969                    member_node_ids.insert(child.parent_id);
970                    member_node_ids.insert(child.child_id);
971                    queue.push_back(child.child_id);
972                }
973            }
974        }
975
976        let mut nodes = BTreeMap::new();
977        for node_id in &member_node_ids {
978            if let Some(node) = all_nodes.get(node_id) {
979                nodes.insert(*node_id, node.clone());
980            }
981        }
982
983        let mut structural_edges = BTreeMap::new();
984        let mut children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>> = BTreeMap::new();
985        let mut parent_by_child = BTreeMap::new();
986        for links in candidate_children_by_parent.values() {
987            for link in links {
988                if !(member_node_ids.contains(&link.parent_id)
989                    && member_node_ids.contains(&link.child_id))
990                {
991                    continue;
992                }
993                structural_edges.insert(link.edge_id, link.clone());
994                children_by_parent
995                    .entry(link.parent_id)
996                    .or_default()
997                    .push(link.clone());
998                parent_by_child.insert(link.child_id, link.clone());
999            }
1000        }
1001
1002        for links in children_by_parent.values_mut() {
1003            links.sort_by(|left, right| {
1004                left.child_index
1005                    .cmp(&right.child_index)
1006                    .then_with(|| left.child_id.cmp(&right.child_id))
1007            });
1008        }
1009
1010        Ok(TreeState {
1011            nodes,
1012            structural_edges,
1013            children_by_parent,
1014            parent_by_child,
1015        })
1016    }
1017
1018    fn validate_tree_state(
1019        &self,
1020        definition: &crate::physical::PhysicalTreeDefinition,
1021        state: &TreeState,
1022    ) -> TreeValidation {
1023        let mut issues = Vec::new();
1024        let root_id = EntityId::new(definition.root_id);
1025        if !state.nodes.contains_key(&root_id) {
1026            issues.push(TreeIssue {
1027                code: "missing_root",
1028                message: format!(
1029                    "root node '{}' is missing from tree '{}.{}'",
1030                    definition.root_id, definition.collection, definition.name
1031                ),
1032                entity_id: Some(root_id),
1033            });
1034        }
1035
1036        for (node_id, node) in &state.nodes {
1037            match metadata_tree_name(&node.metadata) {
1038                Some(name) if name == definition.name => {}
1039                Some(_) => issues.push(TreeIssue {
1040                    code: "node_tree_name_mismatch",
1041                    message: format!(
1042                        "node '{}' is part of tree '{}.{}' but is missing '{}' metadata",
1043                        node_id.raw(),
1044                        definition.collection,
1045                        definition.name,
1046                        TREE_METADATA_NAME
1047                    ),
1048                    entity_id: Some(*node_id),
1049                }),
1050                None => {}
1051            }
1052            if metadata_invalid_max_children(&node.metadata) {
1053                issues.push(TreeIssue {
1054                    code: "invalid_max_children",
1055                    message: format!(
1056                        "node '{}' has invalid '{}' metadata",
1057                        node_id.raw(),
1058                        TREE_METADATA_MAX_CHILDREN
1059                    ),
1060                    entity_id: Some(*node_id),
1061                });
1062            }
1063        }
1064
1065        let mut parent_counts: BTreeMap<EntityId, usize> = BTreeMap::new();
1066        for link in state.structural_edges.values() {
1067            parent_counts
1068                .entry(link.child_id)
1069                .and_modify(|count| *count += 1)
1070                .or_insert(1);
1071            if !state.nodes.contains_key(&link.parent_id) {
1072                issues.push(TreeIssue {
1073                    code: "missing_parent_node",
1074                    message: format!(
1075                        "structural edge '{}' references missing parent node '{}'",
1076                        link.edge_id.raw(),
1077                        link.parent_id.raw()
1078                    ),
1079                    entity_id: Some(link.edge_id),
1080                });
1081            }
1082            if !state.nodes.contains_key(&link.child_id) {
1083                issues.push(TreeIssue {
1084                    code: "missing_child_node",
1085                    message: format!(
1086                        "structural edge '{}' references missing child node '{}'",
1087                        link.edge_id.raw(),
1088                        link.child_id.raw()
1089                    ),
1090                    entity_id: Some(link.edge_id),
1091                });
1092            }
1093        }
1094
1095        for (child_id, count) in parent_counts {
1096            if count > 1 {
1097                issues.push(TreeIssue {
1098                    code: "multiple_parents",
1099                    message: format!("node '{}' has multiple structural parents", child_id.raw()),
1100                    entity_id: Some(child_id),
1101                });
1102            }
1103        }
1104
1105        if let Some(link) = state.parent_by_child.get(&root_id) {
1106            issues.push(TreeIssue {
1107                code: "root_has_parent",
1108                message: format!(
1109                    "root node '{}' has a structural parent '{}'",
1110                    root_id.raw(),
1111                    link.parent_id.raw()
1112                ),
1113                entity_id: Some(root_id),
1114            });
1115        }
1116
1117        for node_id in state.nodes.keys() {
1118            if *node_id == root_id {
1119                continue;
1120            }
1121            if !state.parent_by_child.contains_key(node_id) {
1122                issues.push(TreeIssue {
1123                    code: "missing_parent",
1124                    message: format!(
1125                        "node '{}' is disconnected from root '{}'",
1126                        node_id.raw(),
1127                        root_id.raw()
1128                    ),
1129                    entity_id: Some(*node_id),
1130                });
1131            }
1132        }
1133
1134        for (parent_id, links) in &state.children_by_parent {
1135            let mut seen = BTreeSet::new();
1136            for (expected_index, link) in links.iter().enumerate() {
1137                if !seen.insert(link.child_index) {
1138                    issues.push(TreeIssue {
1139                        code: "duplicate_child_index",
1140                        message: format!(
1141                            "parent '{}' has duplicate child index '{}'",
1142                            parent_id.raw(),
1143                            link.child_index
1144                        ),
1145                        entity_id: Some(link.edge_id),
1146                    });
1147                }
1148                if link.child_index != expected_index {
1149                    issues.push(TreeIssue {
1150                        code: "non_compact_child_index",
1151                        message: format!(
1152                            "parent '{}' children must use compact indexes starting at zero",
1153                            parent_id.raw()
1154                        ),
1155                        entity_id: Some(link.edge_id),
1156                    });
1157                    break;
1158                }
1159            }
1160
1161            if let Ok(limit) = effective_max_children(definition, state, *parent_id) {
1162                if links.len() > limit {
1163                    issues.push(TreeIssue {
1164                        code: "max_children_exceeded",
1165                        message: format!(
1166                            "parent '{}' has {} children but limit is {}",
1167                            parent_id.raw(),
1168                            links.len(),
1169                            limit
1170                        ),
1171                        entity_id: Some(*parent_id),
1172                    });
1173                }
1174            }
1175        }
1176
1177        let mut depths = BTreeMap::new();
1178        let mut visited = BTreeSet::new();
1179        let mut stack = BTreeSet::new();
1180        if state.nodes.contains_key(&root_id) {
1181            validate_tree_cycles(
1182                root_id,
1183                0,
1184                state,
1185                &mut visited,
1186                &mut stack,
1187                &mut depths,
1188                &mut issues,
1189            );
1190        }
1191
1192        for node_id in state.nodes.keys() {
1193            if !depths.contains_key(node_id) {
1194                issues.push(TreeIssue {
1195                    code: "unreachable_node",
1196                    message: format!(
1197                        "node '{}' is not reachable from root '{}'",
1198                        node_id.raw(),
1199                        root_id.raw()
1200                    ),
1201                    entity_id: Some(*node_id),
1202                });
1203            }
1204        }
1205
1206        TreeValidation { issues, depths }
1207    }
1208
1209    fn ensure_tree_operable(
1210        &self,
1211        definition: &crate::physical::PhysicalTreeDefinition,
1212        state: &TreeState,
1213    ) -> RedDBResult<()> {
1214        let validation = self.validate_tree_state(definition, state);
1215        let blocking_issue = validation.issues.into_iter().find(|issue| {
1216            !matches!(
1217                issue.code,
1218                "duplicate_child_index" | "non_compact_child_index"
1219            )
1220        });
1221        if let Some(issue) = blocking_issue {
1222            return Err(RedDBError::Query(format!(
1223                "tree '{}.{}' is not operable: {}",
1224                definition.collection, definition.name, issue.message
1225            )));
1226        }
1227        Ok(())
1228    }
1229
1230    fn rewrite_parent_children(
1231        &self,
1232        collection: &str,
1233        tree_name: &str,
1234        parent_id: EntityId,
1235        child_ids: &[EntityId],
1236        current_state: &TreeState,
1237    ) -> RedDBResult<()> {
1238        if let Some(existing_links) = current_state.children_by_parent.get(&parent_id) {
1239            for link in existing_links {
1240                let _ = self.delete_entity_internal(collection, link.edge_id)?;
1241            }
1242        }
1243        for (index, child_id) in child_ids.iter().enumerate() {
1244            self.create_edge_unchecked(self.build_tree_structural_edge_input(
1245                collection, tree_name, parent_id, *child_id, index,
1246            ))?;
1247        }
1248        Ok(())
1249    }
1250
1251    fn graph_edges_touching_nodes(
1252        &self,
1253        collection: &str,
1254        nodes: &BTreeSet<EntityId>,
1255    ) -> RedDBResult<BTreeSet<EntityId>> {
1256        let store = self.inner.db.store();
1257        let Some(manager) = store.get_collection(collection) else {
1258            return Ok(BTreeSet::new());
1259        };
1260        let mut edge_ids = BTreeSet::new();
1261        for entity in manager.query_all(|entity| matches!(entity.kind, EntityKind::GraphEdge(_))) {
1262            let EntityKind::GraphEdge(edge_kind) = &entity.kind else {
1263                continue;
1264            };
1265            let Some(from_id) = parse_tree_entity_id_text(&edge_kind.from_node) else {
1266                continue;
1267            };
1268            let Some(to_id) = parse_tree_entity_id_text(&edge_kind.to_node) else {
1269                continue;
1270            };
1271            if nodes.contains(&from_id) || nodes.contains(&to_id) {
1272                edge_ids.insert(entity.id);
1273            }
1274        }
1275        Ok(edge_ids)
1276    }
1277
1278    fn delete_entity_internal(&self, collection: &str, id: EntityId) -> RedDBResult<bool> {
1279        let store = self.inner.db.store();
1280        let deleted = store
1281            .delete(collection, id)
1282            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1283        if deleted {
1284            store.context_index().remove_entity(id);
1285            self.cdc_emit(
1286                crate::replication::cdc::ChangeOperation::Delete,
1287                collection,
1288                id.raw(),
1289                "entity",
1290            );
1291        }
1292        Ok(deleted)
1293    }
1294
1295    fn plan_tree_rebalance(
1296        &self,
1297        definition: &crate::physical::PhysicalTreeDefinition,
1298        state: &TreeState,
1299    ) -> RedDBResult<Vec<TreeRebalancePlanRow>> {
1300        let validation = self.validate_tree_state(definition, state);
1301        let root_id = EntityId::new(definition.root_id);
1302        if !validation.depths.contains_key(&root_id) {
1303            return Err(RedDBError::Query(format!(
1304                "tree '{}.{}' is missing its root '{}'",
1305                definition.collection, definition.name, definition.root_id
1306            )));
1307        }
1308
1309        let ordered = flatten_tree_preorder(state, root_id);
1310        let mut assignment_queue = VecDeque::new();
1311        assignment_queue.push_back(root_id);
1312        let mut assigned_children_count: BTreeMap<EntityId, usize> = BTreeMap::new();
1313        let mut new_depths = BTreeMap::new();
1314        new_depths.insert(root_id, 0usize);
1315
1316        let mut plan = Vec::with_capacity(ordered.len());
1317        for node_id in ordered {
1318            loop {
1319                let Some(candidate_parent) = assignment_queue.front().copied() else {
1320                    return Err(RedDBError::Query(format!(
1321                        "tree '{}.{}' does not have enough aggregate capacity to rebalance",
1322                        definition.collection, definition.name
1323                    )));
1324                };
1325                let assigned = assigned_children_count
1326                    .get(&candidate_parent)
1327                    .copied()
1328                    .unwrap_or(0);
1329                let limit = effective_max_children(definition, state, candidate_parent)?;
1330                if assigned < limit {
1331                    break;
1332                }
1333                assignment_queue.pop_front();
1334            }
1335
1336            let parent_id = assignment_queue.front().copied().ok_or_else(|| {
1337                RedDBError::Query(format!(
1338                    "tree '{}.{}' does not have enough aggregate capacity to rebalance",
1339                    definition.collection, definition.name
1340                ))
1341            })?;
1342            let entry = assigned_children_count.entry(parent_id).or_insert(0);
1343            let new_index = *entry;
1344            *entry += 1;
1345            let parent_depth = new_depths.get(&parent_id).copied().unwrap_or(0);
1346            new_depths.insert(node_id, parent_depth + 1);
1347            assignment_queue.push_back(node_id);
1348
1349            let old_link = state.parent_by_child.get(&node_id).ok_or_else(|| {
1350                RedDBError::Query(format!(
1351                    "node '{}' is missing a structural parent in tree '{}.{}'",
1352                    node_id.raw(),
1353                    definition.collection,
1354                    definition.name
1355                ))
1356            })?;
1357            let old_depth = validation.depths.get(&node_id).copied().unwrap_or(0);
1358            let new_depth = new_depths.get(&node_id).copied().unwrap_or(0);
1359            let changed = old_link.parent_id != parent_id
1360                || old_link.child_index != new_index
1361                || old_depth != new_depth;
1362
1363            plan.push(TreeRebalancePlanRow {
1364                node_id,
1365                old_parent_id: old_link.parent_id,
1366                new_parent_id: parent_id,
1367                old_index: old_link.child_index,
1368                new_index,
1369                old_depth,
1370                new_depth,
1371                changed,
1372            });
1373        }
1374
1375        Ok(plan)
1376    }
1377}
1378
1379fn current_tree_unix_ms() -> u128 {
1380    SystemTime::now()
1381        .duration_since(UNIX_EPOCH)
1382        .unwrap_or_default()
1383        .as_millis()
1384}
1385
1386fn build_tree_node_metadata(
1387    tree_name: &str,
1388    metadata_entries: &[(String, Value)],
1389) -> RedDBResult<Vec<(String, MetadataValue)>> {
1390    let mut metadata = Vec::with_capacity(metadata_entries.len() + 1);
1391    metadata.push((
1392        TREE_METADATA_NAME.to_string(),
1393        MetadataValue::String(tree_name.to_string()),
1394    ));
1395    for (key, value) in metadata_entries {
1396        metadata.push((key.clone(), storage_value_to_tree_metadata(value)?));
1397    }
1398    Ok(metadata)
1399}
1400
1401fn storage_value_to_tree_metadata(value: &Value) -> RedDBResult<MetadataValue> {
1402    Ok(match value {
1403        Value::Null => MetadataValue::Null,
1404        Value::Boolean(value) => MetadataValue::Bool(*value),
1405        Value::Integer(value) => MetadataValue::Int(*value),
1406        Value::UnsignedInteger(value) => {
1407            if *value <= i64::MAX as u64 {
1408                MetadataValue::Int(*value as i64)
1409            } else {
1410                MetadataValue::Timestamp(*value)
1411            }
1412        }
1413        Value::Float(value) => MetadataValue::Float(*value),
1414        Value::Text(value) => MetadataValue::String(value.to_string()),
1415        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
1416        Value::Timestamp(value) => {
1417            if *value < 0 {
1418                return Err(RedDBError::Query(
1419                    "negative timestamps are not supported in tree metadata".to_string(),
1420                ));
1421            }
1422            MetadataValue::Timestamp(*value as u64)
1423        }
1424        Value::Json(bytes) => {
1425            let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1426                RedDBError::Query(format!("failed to decode JSON metadata value: {err}"))
1427            })?;
1428            json_to_metadata_value(&json)?
1429        }
1430        Value::Vector(values) => MetadataValue::Array(
1431            values
1432                .iter()
1433                .map(|value| MetadataValue::Float(*value as f64))
1434                .collect(),
1435        ),
1436        Value::Array(values) => MetadataValue::Array(
1437            values
1438                .iter()
1439                .map(storage_value_to_tree_metadata)
1440                .collect::<RedDBResult<Vec<_>>>()?,
1441        ),
1442        other => MetadataValue::String(format!("{other:?}")),
1443    })
1444}
1445
1446fn resolve_tree_insert_position(position: TreePosition, len: usize) -> RedDBResult<usize> {
1447    match position {
1448        TreePosition::First => Ok(0),
1449        TreePosition::Last => Ok(len),
1450        TreePosition::Index(index) if index <= len => Ok(index),
1451        TreePosition::Index(index) => Err(RedDBError::Query(format!(
1452            "tree child position {} is out of bounds for {} children",
1453            index, len
1454        ))),
1455    }
1456}
1457
1458fn child_id_list(
1459    children_by_parent: &BTreeMap<EntityId, Vec<TreeChildLink>>,
1460    parent_id: EntityId,
1461) -> Vec<EntityId> {
1462    children_by_parent
1463        .get(&parent_id)
1464        .map(|links| links.iter().map(|link| link.child_id).collect())
1465        .unwrap_or_default()
1466}
1467
1468fn subtree_ids(state: &TreeState, root_id: EntityId) -> BTreeSet<EntityId> {
1469    let mut ids = BTreeSet::new();
1470    let mut queue = VecDeque::new();
1471    queue.push_back(root_id);
1472    while let Some(node_id) = queue.pop_front() {
1473        if !ids.insert(node_id) {
1474            continue;
1475        }
1476        if let Some(children) = state.children_by_parent.get(&node_id) {
1477            for child in children {
1478                queue.push_back(child.child_id);
1479            }
1480        }
1481    }
1482    ids
1483}
1484
1485fn flatten_tree_preorder(state: &TreeState, root_id: EntityId) -> Vec<EntityId> {
1486    let mut ordered = Vec::new();
1487    flatten_tree_preorder_inner(state, root_id, &mut ordered);
1488    ordered
1489}
1490
1491fn flatten_tree_preorder_inner(state: &TreeState, current: EntityId, ordered: &mut Vec<EntityId>) {
1492    if let Some(children) = state.children_by_parent.get(&current) {
1493        for child in children {
1494            ordered.push(child.child_id);
1495            flatten_tree_preorder_inner(state, child.child_id, ordered);
1496        }
1497    }
1498}
1499
1500fn validate_tree_cycles(
1501    node_id: EntityId,
1502    depth: usize,
1503    state: &TreeState,
1504    visited: &mut BTreeSet<EntityId>,
1505    stack: &mut BTreeSet<EntityId>,
1506    depths: &mut BTreeMap<EntityId, usize>,
1507    issues: &mut Vec<TreeIssue>,
1508) {
1509    if stack.contains(&node_id) {
1510        issues.push(TreeIssue {
1511            code: "cycle_detected",
1512            message: format!("cycle detected at node '{}'", node_id.raw()),
1513            entity_id: Some(node_id),
1514        });
1515        return;
1516    }
1517    if !visited.insert(node_id) {
1518        return;
1519    }
1520    stack.insert(node_id);
1521    depths.insert(node_id, depth);
1522    if let Some(children) = state.children_by_parent.get(&node_id) {
1523        for child in children {
1524            validate_tree_cycles(
1525                child.child_id,
1526                depth + 1,
1527                state,
1528                visited,
1529                stack,
1530                depths,
1531                issues,
1532            );
1533        }
1534    }
1535    stack.remove(&node_id);
1536}
1537
1538fn effective_max_children(
1539    definition: &crate::physical::PhysicalTreeDefinition,
1540    state: &TreeState,
1541    node_id: EntityId,
1542) -> RedDBResult<usize> {
1543    let Some(node) = state.nodes.get(&node_id) else {
1544        return Err(RedDBError::NotFound(format!(
1545            "node '{}' was not found in tree '{}.{}'",
1546            node_id.raw(),
1547            definition.collection,
1548            definition.name
1549        )));
1550    };
1551    if let Some(value) = node.max_children_override {
1552        if value == 0 {
1553            return Err(RedDBError::Query(format!(
1554                "node '{}' has non-positive max children override",
1555                node_id.raw()
1556            )));
1557        }
1558        return Ok(value);
1559    }
1560    Ok(definition.default_max_children)
1561}
1562
1563fn metadata_tree_name(metadata: &Metadata) -> Option<&str> {
1564    match metadata.get(TREE_METADATA_NAME) {
1565        Some(MetadataValue::String(value)) => Some(value.as_str()),
1566        _ => None,
1567    }
1568}
1569
1570fn metadata_usize(metadata: &Metadata, key: &str) -> Option<usize> {
1571    match metadata.get(key) {
1572        Some(MetadataValue::Int(value)) if *value >= 0 => Some(*value as usize),
1573        Some(MetadataValue::Timestamp(value)) => (*value).try_into().ok(),
1574        _ => None,
1575    }
1576}
1577
1578fn metadata_invalid_max_children(metadata: &Metadata) -> bool {
1579    match metadata.get(TREE_METADATA_MAX_CHILDREN) {
1580        None => false,
1581        Some(MetadataValue::Int(value)) => *value <= 0,
1582        Some(MetadataValue::Timestamp(value)) => *value == 0,
1583        _ => true,
1584    }
1585}
1586
1587fn parse_tree_entity_id_text(value: &str) -> Option<EntityId> {
1588    value
1589        .strip_prefix('e')
1590        .unwrap_or(value)
1591        .parse::<u64>()
1592        .ok()
1593        .filter(|value| *value > 0)
1594        .map(EntityId::new)
1595}