1use crate::application::entity::json_to_metadata_value;
4use crate::application::ports::RuntimeEntityPort;
5use crate::storage::query::ast::{
6 CreateTreeQuery, DropTreeQuery, TreeCommand, TreeNodeSpec, TreePosition,
7};
8use crate::storage::unified::{Metadata, MetadataValue};
9
10use super::*;
11
12const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
13const TREE_METADATA_PREFIX: &str = "red.tree.";
14const TREE_METADATA_NAME: &str = "red.tree.name";
15const TREE_METADATA_MAX_CHILDREN: &str = "red.tree.max_children";
16const TREE_METADATA_CHILD_INDEX: &str = "red.tree.child_index";
17const TREE_OWNERSHIP_OWNED: &str = "owned";
18const TREE_AUTO_FIX_CONSERVATIVE: &str = "conservative";
19
20#[derive(Debug, Clone)]
21struct TreeIssue {
22 code: &'static str,
23 message: String,
24 entity_id: Option<EntityId>,
25}
26
27#[derive(Debug, Clone)]
28struct TreeNodeState {
29 metadata: Metadata,
30 max_children_override: Option<usize>,
31}
32
33#[derive(Debug, Clone)]
34struct TreeChildLink {
35 edge_id: EntityId,
36 parent_id: EntityId,
37 child_id: EntityId,
38 child_index: usize,
39}
40
41#[derive(Debug, Clone)]
42struct TreeState {
43 nodes: BTreeMap<EntityId, TreeNodeState>,
44 structural_edges: BTreeMap<EntityId, TreeChildLink>,
45 children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>>,
46 parent_by_child: BTreeMap<EntityId, TreeChildLink>,
47}
48
49#[derive(Debug, Clone)]
50struct TreeValidation {
51 issues: Vec<TreeIssue>,
52 depths: BTreeMap<EntityId, usize>,
53}
54
55#[derive(Debug, Clone)]
56struct TreeRebalancePlanRow {
57 node_id: EntityId,
58 old_parent_id: EntityId,
59 new_parent_id: EntityId,
60 old_index: usize,
61 new_index: usize,
62 old_depth: usize,
63 new_depth: usize,
64 changed: bool,
65}
66
67impl RedDBRuntime {
68 pub fn execute_create_tree(
69 &self,
70 raw_query: &str,
71 query: &CreateTreeQuery,
72 ) -> RedDBResult<RuntimeQueryResult> {
73 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
74 if query.default_max_children == 0 {
75 return Err(RedDBError::Query(
76 "tree default max children must be positive".to_string(),
77 ));
78 }
79
80 if self
81 .inner
82 .db
83 .tree_definition(&query.collection, &query.name)
84 .is_some()
85 {
86 if query.if_not_exists {
87 return Ok(RuntimeQueryResult::ok_message(
88 raw_query.to_string(),
89 &format!("tree '{}.{}' already exists", query.collection, query.name),
90 "create",
91 ));
92 }
93 return Err(RedDBError::Query(format!(
94 "tree '{}.{}' already exists",
95 query.collection, query.name
96 )));
97 }
98
99 let root = self.create_node_unchecked(self.build_tree_create_node_input(
100 &query.collection,
101 &query.name,
102 &query.root,
103 false,
104 )?)?;
105 self.ensure_tree_root_metadata(
106 &query.collection,
107 root.id,
108 &query.name,
109 query.root.max_children,
110 )?;
111 let now = current_tree_unix_ms();
112 let definition = crate::physical::PhysicalTreeDefinition {
113 collection: query.collection.clone(),
114 name: query.name.clone(),
115 root_id: root.id.raw(),
116 default_max_children: query.default_max_children,
117 ordered_children: true,
118 ownership: TREE_OWNERSHIP_OWNED.to_string(),
119 auto_fix_mode: TREE_AUTO_FIX_CONSERVATIVE.to_string(),
120 created_at_unix_ms: now,
121 updated_at_unix_ms: now,
122 };
123 self.inner
124 .db
125 .save_tree_definition(definition)
126 .map_err(|err| RedDBError::Internal(err.to_string()))?;
127 self.inner
128 .db
129 .persist_metadata()
130 .map_err(|err| RedDBError::Internal(err.to_string()))?;
131 self.note_table_write(&query.collection);
132 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
136 collection: query.collection.clone(),
137 index: query.name.clone(),
138 columns: Vec::new(),
139 });
140
141 Ok(RuntimeQueryResult::ok_records(
142 raw_query.to_string(),
143 vec![
144 "collection".to_string(),
145 "tree_name".to_string(),
146 "root_id".to_string(),
147 "default_max_children".to_string(),
148 ],
149 vec![vec![
150 (
151 "collection".to_string(),
152 Value::text(query.collection.clone()),
153 ),
154 ("tree_name".to_string(), Value::text(query.name.clone())),
155 ("root_id".to_string(), Value::UnsignedInteger(root.id.raw())),
156 (
157 "default_max_children".to_string(),
158 Value::UnsignedInteger(query.default_max_children as u64),
159 ),
160 ]],
161 "create",
162 ))
163 }
164
165 pub fn execute_drop_tree(
166 &self,
167 raw_query: &str,
168 query: &DropTreeQuery,
169 ) -> RedDBResult<RuntimeQueryResult> {
170 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
171 let Some(definition) = self
172 .inner
173 .db
174 .tree_definition(&query.collection, &query.name)
175 else {
176 if query.if_exists {
177 return Ok(RuntimeQueryResult::ok_message(
178 raw_query.to_string(),
179 &format!("tree '{}.{}' does not exist", query.collection, query.name),
180 "drop",
181 ));
182 }
183 return Err(RedDBError::NotFound(format!(
184 "tree '{}.{}' not found",
185 query.collection, query.name
186 )));
187 };
188
189 let state = self.load_tree_state(&definition)?;
190 let node_ids: BTreeSet<EntityId> = state.nodes.keys().copied().collect();
191 let edge_ids = self.graph_edges_touching_nodes(&definition.collection, &node_ids)?;
192
193 for edge_id in edge_ids {
194 let _ = self.delete_entity_internal(&definition.collection, edge_id)?;
195 }
196
197 let mut ordered_nodes: Vec<EntityId> = node_ids.into_iter().collect();
198 ordered_nodes.sort_by(|left, right| right.cmp(left));
199 for node_id in ordered_nodes {
200 let _ = self.delete_entity_internal(&definition.collection, node_id)?;
201 }
202
203 self.inner
204 .db
205 .remove_tree_definition(&query.collection, &query.name)
206 .map_err(|err| RedDBError::Internal(err.to_string()))?;
207 self.inner
208 .db
209 .persist_metadata()
210 .map_err(|err| RedDBError::Internal(err.to_string()))?;
211 self.note_table_write(&query.collection);
212 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
214 collection: query.collection.clone(),
215 index: query.name.clone(),
216 });
217
218 Ok(RuntimeQueryResult::ok_message(
219 raw_query.to_string(),
220 &format!("tree '{}.{}' dropped", query.collection, query.name),
221 "drop",
222 ))
223 }
224
225 pub fn execute_tree_command(
226 &self,
227 raw_query: &str,
228 command: &TreeCommand,
229 ) -> RedDBResult<RuntimeQueryResult> {
230 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
231 match command {
232 TreeCommand::Insert {
233 collection,
234 tree_name,
235 parent_id,
236 node,
237 position,
238 } => self.execute_tree_insert(
239 raw_query,
240 collection,
241 tree_name,
242 EntityId::new(*parent_id),
243 node,
244 *position,
245 ),
246 TreeCommand::Move {
247 collection,
248 tree_name,
249 node_id,
250 parent_id,
251 position,
252 } => self.execute_tree_move(
253 raw_query,
254 collection,
255 tree_name,
256 EntityId::new(*node_id),
257 EntityId::new(*parent_id),
258 *position,
259 ),
260 TreeCommand::Delete {
261 collection,
262 tree_name,
263 node_id,
264 } => {
265 self.execute_tree_delete(raw_query, collection, tree_name, EntityId::new(*node_id))
266 }
267 TreeCommand::Validate {
268 collection,
269 tree_name,
270 } => self.execute_tree_validate(raw_query, collection, tree_name),
271 TreeCommand::Rebalance {
272 collection,
273 tree_name,
274 dry_run,
275 } => self.execute_tree_rebalance(raw_query, collection, tree_name, *dry_run),
276 }
277 }
278
279 fn execute_tree_insert(
280 &self,
281 raw_query: &str,
282 collection: &str,
283 tree_name: &str,
284 parent_id: EntityId,
285 node: &TreeNodeSpec,
286 position: TreePosition,
287 ) -> RedDBResult<RuntimeQueryResult> {
288 let definition = self.require_tree_definition(collection, tree_name)?;
289 let state = self.load_tree_state(&definition)?;
290 self.ensure_tree_operable(&definition, &state)?;
291
292 if !state.nodes.contains_key(&parent_id) {
293 return Err(RedDBError::NotFound(format!(
294 "parent node '{}' was not found in tree '{}.{}'",
295 parent_id.raw(),
296 collection,
297 tree_name
298 )));
299 }
300
301 let mut children = child_id_list(&state.children_by_parent, parent_id);
302 let insert_index = resolve_tree_insert_position(position, children.len())?;
303 let parent_limit = effective_max_children(&definition, &state, parent_id)?;
304 if children.len() >= parent_limit {
305 return Err(RedDBError::Query(format!(
306 "parent node '{}' is at capacity ({}) in tree '{}.{}'",
307 parent_id.raw(),
308 parent_limit,
309 collection,
310 tree_name
311 )));
312 }
313
314 let created = self.create_node_unchecked(
315 self.build_tree_create_node_input(collection, tree_name, node, true)?,
316 )?;
317 children.insert(insert_index, created.id);
318 self.rewrite_parent_children(collection, tree_name, parent_id, &children, &state)?;
319 self.touch_tree_definition_timestamp(&definition)?;
320 self.note_table_write(collection);
321
322 Ok(RuntimeQueryResult::ok_records(
323 raw_query.to_string(),
324 vec![
325 "collection".to_string(),
326 "tree_name".to_string(),
327 "node_id".to_string(),
328 "parent_id".to_string(),
329 "child_index".to_string(),
330 ],
331 vec![vec![
332 (
333 "collection".to_string(),
334 Value::text(collection.to_string()),
335 ),
336 ("tree_name".to_string(), Value::text(tree_name.to_string())),
337 (
338 "node_id".to_string(),
339 Value::UnsignedInteger(created.id.raw()),
340 ),
341 (
342 "parent_id".to_string(),
343 Value::UnsignedInteger(parent_id.raw()),
344 ),
345 (
346 "child_index".to_string(),
347 Value::UnsignedInteger(insert_index as u64),
348 ),
349 ]],
350 "insert",
351 ))
352 }
353
354 fn execute_tree_move(
355 &self,
356 raw_query: &str,
357 collection: &str,
358 tree_name: &str,
359 node_id: EntityId,
360 new_parent_id: EntityId,
361 position: TreePosition,
362 ) -> RedDBResult<RuntimeQueryResult> {
363 let definition = self.require_tree_definition(collection, tree_name)?;
364 let state = self.load_tree_state(&definition)?;
365 self.ensure_tree_operable(&definition, &state)?;
366
367 if node_id.raw() == definition.root_id {
368 return Err(RedDBError::Query("cannot move the tree root".to_string()));
369 }
370 if !state.nodes.contains_key(&node_id) {
371 return Err(RedDBError::NotFound(format!(
372 "node '{}' was not found in tree '{}.{}'",
373 node_id.raw(),
374 collection,
375 tree_name
376 )));
377 }
378 if !state.nodes.contains_key(&new_parent_id) {
379 return Err(RedDBError::NotFound(format!(
380 "parent node '{}' was not found in tree '{}.{}'",
381 new_parent_id.raw(),
382 collection,
383 tree_name
384 )));
385 }
386 if node_id == new_parent_id {
387 return Err(RedDBError::Query(
388 "node cannot be moved under itself".to_string(),
389 ));
390 }
391 if subtree_ids(&state, node_id).contains(&new_parent_id) {
392 return Err(RedDBError::Query(
393 "node cannot be moved under one of its descendants".to_string(),
394 ));
395 }
396
397 let current_parent = state
398 .parent_by_child
399 .get(&node_id)
400 .map(|link| link.parent_id)
401 .ok_or_else(|| {
402 RedDBError::Query(format!(
403 "node '{}' does not have a structural parent in tree '{}.{}'",
404 node_id.raw(),
405 collection,
406 tree_name
407 ))
408 })?;
409
410 let mut old_parent_children = child_id_list(&state.children_by_parent, current_parent);
411 old_parent_children.retain(|child| *child != node_id);
412
413 let mut new_parent_children = if current_parent == new_parent_id {
414 old_parent_children.clone()
415 } else {
416 child_id_list(&state.children_by_parent, new_parent_id)
417 };
418
419 let insert_index = resolve_tree_insert_position(position, new_parent_children.len())?;
420 let new_parent_limit = effective_max_children(&definition, &state, new_parent_id)?;
421 if current_parent != new_parent_id && new_parent_children.len() >= new_parent_limit {
422 return Err(RedDBError::Query(format!(
423 "parent node '{}' is at capacity ({}) in tree '{}.{}'",
424 new_parent_id.raw(),
425 new_parent_limit,
426 collection,
427 tree_name
428 )));
429 }
430
431 new_parent_children.insert(insert_index, node_id);
432
433 if current_parent == new_parent_id {
434 self.rewrite_parent_children(
435 collection,
436 tree_name,
437 current_parent,
438 &new_parent_children,
439 &state,
440 )?;
441 } else {
442 self.rewrite_parent_children(
443 collection,
444 tree_name,
445 current_parent,
446 &old_parent_children,
447 &state,
448 )?;
449 self.rewrite_parent_children(
450 collection,
451 tree_name,
452 new_parent_id,
453 &new_parent_children,
454 &state,
455 )?;
456 }
457
458 self.touch_tree_definition_timestamp(&definition)?;
459 self.note_table_write(collection);
460
461 Ok(RuntimeQueryResult::ok_records(
462 raw_query.to_string(),
463 vec![
464 "collection".to_string(),
465 "tree_name".to_string(),
466 "node_id".to_string(),
467 "old_parent_id".to_string(),
468 "new_parent_id".to_string(),
469 "child_index".to_string(),
470 ],
471 vec![vec![
472 (
473 "collection".to_string(),
474 Value::text(collection.to_string()),
475 ),
476 ("tree_name".to_string(), Value::text(tree_name.to_string())),
477 ("node_id".to_string(), Value::UnsignedInteger(node_id.raw())),
478 (
479 "old_parent_id".to_string(),
480 Value::UnsignedInteger(current_parent.raw()),
481 ),
482 (
483 "new_parent_id".to_string(),
484 Value::UnsignedInteger(new_parent_id.raw()),
485 ),
486 (
487 "child_index".to_string(),
488 Value::UnsignedInteger(insert_index as u64),
489 ),
490 ]],
491 "update",
492 ))
493 }
494
495 fn execute_tree_delete(
496 &self,
497 raw_query: &str,
498 collection: &str,
499 tree_name: &str,
500 node_id: EntityId,
501 ) -> RedDBResult<RuntimeQueryResult> {
502 let definition = self.require_tree_definition(collection, tree_name)?;
503 let state = self.load_tree_state(&definition)?;
504 self.ensure_tree_operable(&definition, &state)?;
505
506 if !state.nodes.contains_key(&node_id) {
507 return Err(RedDBError::NotFound(format!(
508 "node '{}' was not found in tree '{}.{}'",
509 node_id.raw(),
510 collection,
511 tree_name
512 )));
513 }
514
515 if node_id.raw() == definition.root_id {
516 return self.execute_drop_tree(
517 raw_query,
518 &DropTreeQuery {
519 collection: collection.to_string(),
520 name: tree_name.to_string(),
521 if_exists: false,
522 },
523 );
524 }
525
526 let subtree = subtree_ids(&state, node_id);
527 let parent_id = state
528 .parent_by_child
529 .get(&node_id)
530 .map(|link| link.parent_id)
531 .ok_or_else(|| {
532 RedDBError::Query(format!(
533 "node '{}' does not have a structural parent in tree '{}.{}'",
534 node_id.raw(),
535 collection,
536 tree_name
537 ))
538 })?;
539 let surviving_children: Vec<EntityId> = child_id_list(&state.children_by_parent, parent_id)
540 .into_iter()
541 .filter(|child| *child != node_id)
542 .collect();
543
544 let edge_ids = self.graph_edges_touching_nodes(collection, &subtree)?;
545 for edge_id in edge_ids {
546 let _ = self.delete_entity_internal(collection, edge_id)?;
547 }
548
549 let mut ordered_nodes: Vec<EntityId> = subtree.iter().copied().collect();
550 ordered_nodes.sort_by(|left, right| right.cmp(left));
551 for subtree_node in ordered_nodes {
552 let _ = self.delete_entity_internal(collection, subtree_node)?;
553 }
554
555 self.rewrite_parent_children(
556 collection,
557 tree_name,
558 parent_id,
559 &surviving_children,
560 &state,
561 )?;
562 self.touch_tree_definition_timestamp(&definition)?;
563 self.note_table_write(collection);
564
565 Ok(RuntimeQueryResult::ok_records(
566 raw_query.to_string(),
567 vec![
568 "collection".to_string(),
569 "tree_name".to_string(),
570 "deleted_root_id".to_string(),
571 "deleted_node_count".to_string(),
572 ],
573 vec![vec![
574 (
575 "collection".to_string(),
576 Value::text(collection.to_string()),
577 ),
578 ("tree_name".to_string(), Value::text(tree_name.to_string())),
579 (
580 "deleted_root_id".to_string(),
581 Value::UnsignedInteger(node_id.raw()),
582 ),
583 (
584 "deleted_node_count".to_string(),
585 Value::UnsignedInteger(subtree.len() as u64),
586 ),
587 ]],
588 "delete",
589 ))
590 }
591
592 fn execute_tree_validate(
593 &self,
594 raw_query: &str,
595 collection: &str,
596 tree_name: &str,
597 ) -> RedDBResult<RuntimeQueryResult> {
598 let definition = self.require_tree_definition(collection, tree_name)?;
599 let state = self.load_tree_state(&definition)?;
600 let validation = self.validate_tree_state(&definition, &state);
601
602 if validation.issues.is_empty() {
603 return Ok(RuntimeQueryResult::ok_records(
604 raw_query.to_string(),
605 vec![
606 "ok".to_string(),
607 "code".to_string(),
608 "message".to_string(),
609 "entity_id".to_string(),
610 ],
611 vec![vec![
612 ("ok".to_string(), Value::Boolean(true)),
613 ("code".to_string(), Value::text("ok".to_string())),
614 (
615 "message".to_string(),
616 Value::text("tree is valid".to_string()),
617 ),
618 ("entity_id".to_string(), Value::Null),
619 ]],
620 "select",
621 ));
622 }
623
624 let rows = validation
625 .issues
626 .iter()
627 .map(|issue| {
628 vec![
629 ("ok".to_string(), Value::Boolean(false)),
630 ("code".to_string(), Value::text(issue.code.to_string())),
631 ("message".to_string(), Value::text(issue.message.clone())),
632 (
633 "entity_id".to_string(),
634 issue
635 .entity_id
636 .map(|entity_id| Value::UnsignedInteger(entity_id.raw()))
637 .unwrap_or(Value::Null),
638 ),
639 ]
640 })
641 .collect();
642 Ok(RuntimeQueryResult::ok_records(
643 raw_query.to_string(),
644 vec![
645 "ok".to_string(),
646 "code".to_string(),
647 "message".to_string(),
648 "entity_id".to_string(),
649 ],
650 rows,
651 "select",
652 ))
653 }
654
655 fn execute_tree_rebalance(
656 &self,
657 raw_query: &str,
658 collection: &str,
659 tree_name: &str,
660 dry_run: bool,
661 ) -> RedDBResult<RuntimeQueryResult> {
662 let definition = self.require_tree_definition(collection, tree_name)?;
663 let state = self.load_tree_state(&definition)?;
664 self.ensure_tree_operable(&definition, &state)?;
665 let plan = self.plan_tree_rebalance(&definition, &state)?;
666
667 if !dry_run {
668 let existing_edges: Vec<EntityId> = state.structural_edges.keys().copied().collect();
669 for edge_id in existing_edges {
670 let _ = self.delete_entity_internal(collection, edge_id)?;
671 }
672
673 for row in &plan {
674 self.create_edge_unchecked(self.build_tree_structural_edge_input(
675 collection,
676 tree_name,
677 row.new_parent_id,
678 row.node_id,
679 row.new_index,
680 ))?;
681 }
682
683 self.touch_tree_definition_timestamp(&definition)?;
684 self.note_table_write(collection);
685 }
686
687 let rows = plan
688 .iter()
689 .map(|row| {
690 vec![
691 (
692 "node_id".to_string(),
693 Value::UnsignedInteger(row.node_id.raw()),
694 ),
695 (
696 "old_parent_id".to_string(),
697 Value::UnsignedInteger(row.old_parent_id.raw()),
698 ),
699 (
700 "new_parent_id".to_string(),
701 Value::UnsignedInteger(row.new_parent_id.raw()),
702 ),
703 (
704 "old_index".to_string(),
705 Value::UnsignedInteger(row.old_index as u64),
706 ),
707 (
708 "new_index".to_string(),
709 Value::UnsignedInteger(row.new_index as u64),
710 ),
711 (
712 "old_depth".to_string(),
713 Value::UnsignedInteger(row.old_depth as u64),
714 ),
715 (
716 "new_depth".to_string(),
717 Value::UnsignedInteger(row.new_depth as u64),
718 ),
719 ("changed".to_string(), Value::Boolean(row.changed)),
720 ]
721 })
722 .collect();
723
724 Ok(RuntimeQueryResult::ok_records(
725 raw_query.to_string(),
726 vec![
727 "node_id".to_string(),
728 "old_parent_id".to_string(),
729 "new_parent_id".to_string(),
730 "old_index".to_string(),
731 "new_index".to_string(),
732 "old_depth".to_string(),
733 "new_depth".to_string(),
734 "changed".to_string(),
735 ],
736 rows,
737 "select",
738 ))
739 }
740
741 fn build_tree_create_node_input(
742 &self,
743 collection: &str,
744 tree_name: &str,
745 node: &TreeNodeSpec,
746 allow_max_children: bool,
747 ) -> RedDBResult<crate::application::CreateNodeInput> {
748 if allow_max_children && node.max_children == Some(0) {
749 return Err(RedDBError::Query(
750 "node max children must be positive".to_string(),
751 ));
752 }
753 self.ensure_tree_spec_is_user_safe(node)?;
754 let mut metadata = build_tree_node_metadata(tree_name, &node.metadata)?;
755 if allow_max_children {
756 if let Some(max_children) = node.max_children {
757 metadata.push((
758 TREE_METADATA_MAX_CHILDREN.to_string(),
759 MetadataValue::Int(max_children as i64),
760 ));
761 }
762 }
763 Ok(crate::application::CreateNodeInput {
764 collection: collection.to_string(),
765 label: node.label.clone(),
766 node_type: node.node_type.clone(),
767 properties: node.properties.clone(),
768 metadata,
769 embeddings: Vec::new(),
770 table_links: Vec::new(),
771 node_links: Vec::new(),
772 })
773 }
774
775 fn build_tree_structural_edge_input(
776 &self,
777 collection: &str,
778 tree_name: &str,
779 parent_id: EntityId,
780 child_id: EntityId,
781 child_index: usize,
782 ) -> crate::application::CreateEdgeInput {
783 crate::application::CreateEdgeInput {
784 collection: collection.to_string(),
785 label: TREE_CHILD_EDGE_LABEL.to_string(),
786 from: parent_id,
787 to: child_id,
788 weight: Some(1.0),
789 properties: Vec::new(),
790 metadata: vec![
791 (
792 TREE_METADATA_NAME.to_string(),
793 MetadataValue::String(tree_name.to_string()),
794 ),
795 (
796 TREE_METADATA_CHILD_INDEX.to_string(),
797 MetadataValue::Int(child_index as i64),
798 ),
799 ],
800 }
801 }
802
803 fn ensure_tree_spec_is_user_safe(&self, node: &TreeNodeSpec) -> RedDBResult<()> {
804 for (key, _) in &node.metadata {
805 if key.starts_with(TREE_METADATA_PREFIX) {
806 return Err(RedDBError::Query(format!(
807 "metadata key '{}' is reserved for managed trees",
808 key
809 )));
810 }
811 }
812 Ok(())
813 }
814
815 fn require_tree_definition(
816 &self,
817 collection: &str,
818 tree_name: &str,
819 ) -> RedDBResult<crate::physical::PhysicalTreeDefinition> {
820 self.inner
821 .db
822 .tree_definition(collection, tree_name)
823 .ok_or_else(|| {
824 RedDBError::NotFound(format!("tree '{}.{}' not found", collection, tree_name))
825 })
826 }
827
828 fn touch_tree_definition_timestamp(
829 &self,
830 definition: &crate::physical::PhysicalTreeDefinition,
831 ) -> RedDBResult<()> {
832 let mut updated = definition.clone();
833 updated.updated_at_unix_ms = current_tree_unix_ms();
834 self.inner
835 .db
836 .save_tree_definition(updated)
837 .map_err(|err| RedDBError::Internal(err.to_string()))?;
838 self.inner
839 .db
840 .persist_metadata()
841 .map_err(|err| RedDBError::Internal(err.to_string()))?;
842 Ok(())
843 }
844
845 fn ensure_tree_root_metadata(
846 &self,
847 collection: &str,
848 root_id: EntityId,
849 tree_name: &str,
850 max_children_override: Option<usize>,
851 ) -> RedDBResult<()> {
852 let store = self.inner.db.store();
853 let mut metadata = store.get_metadata(collection, root_id).unwrap_or_default();
854 metadata.set(
855 TREE_METADATA_NAME.to_string(),
856 MetadataValue::String(tree_name.to_string()),
857 );
858 match max_children_override {
859 Some(value) => metadata.set(
860 TREE_METADATA_MAX_CHILDREN.to_string(),
861 MetadataValue::Int(value as i64),
862 ),
863 None => {
864 metadata.remove(TREE_METADATA_MAX_CHILDREN);
865 }
866 }
867 store
868 .set_metadata(collection, root_id, metadata)
869 .map_err(|err| RedDBError::Internal(err.to_string()))
870 }
871
872 fn load_tree_state(
873 &self,
874 definition: &crate::physical::PhysicalTreeDefinition,
875 ) -> RedDBResult<TreeState> {
876 let store = self.inner.db.store();
877 let Some(manager) = store.get_collection(&definition.collection) else {
878 return Ok(TreeState {
879 nodes: BTreeMap::new(),
880 structural_edges: BTreeMap::new(),
881 children_by_parent: BTreeMap::new(),
882 parent_by_child: BTreeMap::new(),
883 });
884 };
885
886 let mut all_nodes = BTreeMap::new();
887 let mut candidate_links = Vec::new();
888 let mut member_node_ids = BTreeSet::new();
889 let root_id = EntityId::new(definition.root_id);
890
891 for entity in manager.query_all(|entity| {
892 matches!(
893 entity.kind,
894 EntityKind::GraphNode(_) | EntityKind::GraphEdge(_)
895 )
896 }) {
897 let metadata = store
898 .get_metadata(&definition.collection, entity.id)
899 .unwrap_or_default();
900 match &entity.kind {
901 EntityKind::GraphNode(_) => {
902 if metadata_tree_name(&metadata) == Some(definition.name.as_str()) {
903 member_node_ids.insert(entity.id);
904 }
905 all_nodes.insert(
906 entity.id,
907 TreeNodeState {
908 max_children_override: metadata_usize(
909 &metadata,
910 TREE_METADATA_MAX_CHILDREN,
911 ),
912 metadata,
913 },
914 );
915 }
916 EntityKind::GraphEdge(edge_kind) if edge_kind.label == TREE_CHILD_EDGE_LABEL => {
917 let Some(parent_id) = parse_tree_entity_id_text(&edge_kind.from_node) else {
918 continue;
919 };
920 let Some(child_id) = parse_tree_entity_id_text(&edge_kind.to_node) else {
921 continue;
922 };
923 if metadata_tree_name(&metadata) == Some(definition.name.as_str()) {
924 member_node_ids.insert(parent_id);
925 member_node_ids.insert(child_id);
926 }
927 let child_index =
928 metadata_usize(&metadata, TREE_METADATA_CHILD_INDEX).unwrap_or(usize::MAX);
929 candidate_links.push(TreeChildLink {
930 edge_id: entity.id,
931 parent_id,
932 child_id,
933 child_index,
934 });
935 }
936 _ => {}
937 }
938 }
939
940 let mut candidate_children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>> =
941 BTreeMap::new();
942 for link in &candidate_links {
943 candidate_children_by_parent
944 .entry(link.parent_id)
945 .or_default()
946 .push(link.clone());
947 }
948
949 for links in candidate_children_by_parent.values_mut() {
950 links.sort_by(|left, right| {
951 left.child_index
952 .cmp(&right.child_index)
953 .then_with(|| left.edge_id.cmp(&right.edge_id))
954 .then_with(|| left.child_id.cmp(&right.child_id))
955 });
956 for (derived_index, link) in links.iter_mut().enumerate() {
957 link.child_index = derived_index;
958 }
959 }
960
961 let mut queue = VecDeque::new();
962 queue.push_back(root_id);
963 while let Some(node_id) = queue.pop_front() {
964 if !member_node_ids.insert(node_id) {
965 continue;
966 }
967 if let Some(children) = candidate_children_by_parent.get(&node_id) {
968 for child in children {
969 member_node_ids.insert(child.parent_id);
970 member_node_ids.insert(child.child_id);
971 queue.push_back(child.child_id);
972 }
973 }
974 }
975
976 let mut nodes = BTreeMap::new();
977 for node_id in &member_node_ids {
978 if let Some(node) = all_nodes.get(node_id) {
979 nodes.insert(*node_id, node.clone());
980 }
981 }
982
983 let mut structural_edges = BTreeMap::new();
984 let mut children_by_parent: BTreeMap<EntityId, Vec<TreeChildLink>> = BTreeMap::new();
985 let mut parent_by_child = BTreeMap::new();
986 for links in candidate_children_by_parent.values() {
987 for link in links {
988 if !(member_node_ids.contains(&link.parent_id)
989 && member_node_ids.contains(&link.child_id))
990 {
991 continue;
992 }
993 structural_edges.insert(link.edge_id, link.clone());
994 children_by_parent
995 .entry(link.parent_id)
996 .or_default()
997 .push(link.clone());
998 parent_by_child.insert(link.child_id, link.clone());
999 }
1000 }
1001
1002 for links in children_by_parent.values_mut() {
1003 links.sort_by(|left, right| {
1004 left.child_index
1005 .cmp(&right.child_index)
1006 .then_with(|| left.child_id.cmp(&right.child_id))
1007 });
1008 }
1009
1010 Ok(TreeState {
1011 nodes,
1012 structural_edges,
1013 children_by_parent,
1014 parent_by_child,
1015 })
1016 }
1017
1018 fn validate_tree_state(
1019 &self,
1020 definition: &crate::physical::PhysicalTreeDefinition,
1021 state: &TreeState,
1022 ) -> TreeValidation {
1023 let mut issues = Vec::new();
1024 let root_id = EntityId::new(definition.root_id);
1025 if !state.nodes.contains_key(&root_id) {
1026 issues.push(TreeIssue {
1027 code: "missing_root",
1028 message: format!(
1029 "root node '{}' is missing from tree '{}.{}'",
1030 definition.root_id, definition.collection, definition.name
1031 ),
1032 entity_id: Some(root_id),
1033 });
1034 }
1035
1036 for (node_id, node) in &state.nodes {
1037 match metadata_tree_name(&node.metadata) {
1038 Some(name) if name == definition.name => {}
1039 Some(_) => issues.push(TreeIssue {
1040 code: "node_tree_name_mismatch",
1041 message: format!(
1042 "node '{}' is part of tree '{}.{}' but is missing '{}' metadata",
1043 node_id.raw(),
1044 definition.collection,
1045 definition.name,
1046 TREE_METADATA_NAME
1047 ),
1048 entity_id: Some(*node_id),
1049 }),
1050 None => {}
1051 }
1052 if metadata_invalid_max_children(&node.metadata) {
1053 issues.push(TreeIssue {
1054 code: "invalid_max_children",
1055 message: format!(
1056 "node '{}' has invalid '{}' metadata",
1057 node_id.raw(),
1058 TREE_METADATA_MAX_CHILDREN
1059 ),
1060 entity_id: Some(*node_id),
1061 });
1062 }
1063 }
1064
1065 let mut parent_counts: BTreeMap<EntityId, usize> = BTreeMap::new();
1066 for link in state.structural_edges.values() {
1067 parent_counts
1068 .entry(link.child_id)
1069 .and_modify(|count| *count += 1)
1070 .or_insert(1);
1071 if !state.nodes.contains_key(&link.parent_id) {
1072 issues.push(TreeIssue {
1073 code: "missing_parent_node",
1074 message: format!(
1075 "structural edge '{}' references missing parent node '{}'",
1076 link.edge_id.raw(),
1077 link.parent_id.raw()
1078 ),
1079 entity_id: Some(link.edge_id),
1080 });
1081 }
1082 if !state.nodes.contains_key(&link.child_id) {
1083 issues.push(TreeIssue {
1084 code: "missing_child_node",
1085 message: format!(
1086 "structural edge '{}' references missing child node '{}'",
1087 link.edge_id.raw(),
1088 link.child_id.raw()
1089 ),
1090 entity_id: Some(link.edge_id),
1091 });
1092 }
1093 }
1094
1095 for (child_id, count) in parent_counts {
1096 if count > 1 {
1097 issues.push(TreeIssue {
1098 code: "multiple_parents",
1099 message: format!("node '{}' has multiple structural parents", child_id.raw()),
1100 entity_id: Some(child_id),
1101 });
1102 }
1103 }
1104
1105 if let Some(link) = state.parent_by_child.get(&root_id) {
1106 issues.push(TreeIssue {
1107 code: "root_has_parent",
1108 message: format!(
1109 "root node '{}' has a structural parent '{}'",
1110 root_id.raw(),
1111 link.parent_id.raw()
1112 ),
1113 entity_id: Some(root_id),
1114 });
1115 }
1116
1117 for node_id in state.nodes.keys() {
1118 if *node_id == root_id {
1119 continue;
1120 }
1121 if !state.parent_by_child.contains_key(node_id) {
1122 issues.push(TreeIssue {
1123 code: "missing_parent",
1124 message: format!(
1125 "node '{}' is disconnected from root '{}'",
1126 node_id.raw(),
1127 root_id.raw()
1128 ),
1129 entity_id: Some(*node_id),
1130 });
1131 }
1132 }
1133
1134 for (parent_id, links) in &state.children_by_parent {
1135 let mut seen = BTreeSet::new();
1136 for (expected_index, link) in links.iter().enumerate() {
1137 if !seen.insert(link.child_index) {
1138 issues.push(TreeIssue {
1139 code: "duplicate_child_index",
1140 message: format!(
1141 "parent '{}' has duplicate child index '{}'",
1142 parent_id.raw(),
1143 link.child_index
1144 ),
1145 entity_id: Some(link.edge_id),
1146 });
1147 }
1148 if link.child_index != expected_index {
1149 issues.push(TreeIssue {
1150 code: "non_compact_child_index",
1151 message: format!(
1152 "parent '{}' children must use compact indexes starting at zero",
1153 parent_id.raw()
1154 ),
1155 entity_id: Some(link.edge_id),
1156 });
1157 break;
1158 }
1159 }
1160
1161 if let Ok(limit) = effective_max_children(definition, state, *parent_id) {
1162 if links.len() > limit {
1163 issues.push(TreeIssue {
1164 code: "max_children_exceeded",
1165 message: format!(
1166 "parent '{}' has {} children but limit is {}",
1167 parent_id.raw(),
1168 links.len(),
1169 limit
1170 ),
1171 entity_id: Some(*parent_id),
1172 });
1173 }
1174 }
1175 }
1176
1177 let mut depths = BTreeMap::new();
1178 let mut visited = BTreeSet::new();
1179 let mut stack = BTreeSet::new();
1180 if state.nodes.contains_key(&root_id) {
1181 validate_tree_cycles(
1182 root_id,
1183 0,
1184 state,
1185 &mut visited,
1186 &mut stack,
1187 &mut depths,
1188 &mut issues,
1189 );
1190 }
1191
1192 for node_id in state.nodes.keys() {
1193 if !depths.contains_key(node_id) {
1194 issues.push(TreeIssue {
1195 code: "unreachable_node",
1196 message: format!(
1197 "node '{}' is not reachable from root '{}'",
1198 node_id.raw(),
1199 root_id.raw()
1200 ),
1201 entity_id: Some(*node_id),
1202 });
1203 }
1204 }
1205
1206 TreeValidation { issues, depths }
1207 }
1208
1209 fn ensure_tree_operable(
1210 &self,
1211 definition: &crate::physical::PhysicalTreeDefinition,
1212 state: &TreeState,
1213 ) -> RedDBResult<()> {
1214 let validation = self.validate_tree_state(definition, state);
1215 let blocking_issue = validation.issues.into_iter().find(|issue| {
1216 !matches!(
1217 issue.code,
1218 "duplicate_child_index" | "non_compact_child_index"
1219 )
1220 });
1221 if let Some(issue) = blocking_issue {
1222 return Err(RedDBError::Query(format!(
1223 "tree '{}.{}' is not operable: {}",
1224 definition.collection, definition.name, issue.message
1225 )));
1226 }
1227 Ok(())
1228 }
1229
1230 fn rewrite_parent_children(
1231 &self,
1232 collection: &str,
1233 tree_name: &str,
1234 parent_id: EntityId,
1235 child_ids: &[EntityId],
1236 current_state: &TreeState,
1237 ) -> RedDBResult<()> {
1238 if let Some(existing_links) = current_state.children_by_parent.get(&parent_id) {
1239 for link in existing_links {
1240 let _ = self.delete_entity_internal(collection, link.edge_id)?;
1241 }
1242 }
1243 for (index, child_id) in child_ids.iter().enumerate() {
1244 self.create_edge_unchecked(self.build_tree_structural_edge_input(
1245 collection, tree_name, parent_id, *child_id, index,
1246 ))?;
1247 }
1248 Ok(())
1249 }
1250
1251 fn graph_edges_touching_nodes(
1252 &self,
1253 collection: &str,
1254 nodes: &BTreeSet<EntityId>,
1255 ) -> RedDBResult<BTreeSet<EntityId>> {
1256 let store = self.inner.db.store();
1257 let Some(manager) = store.get_collection(collection) else {
1258 return Ok(BTreeSet::new());
1259 };
1260 let mut edge_ids = BTreeSet::new();
1261 for entity in manager.query_all(|entity| matches!(entity.kind, EntityKind::GraphEdge(_))) {
1262 let EntityKind::GraphEdge(edge_kind) = &entity.kind else {
1263 continue;
1264 };
1265 let Some(from_id) = parse_tree_entity_id_text(&edge_kind.from_node) else {
1266 continue;
1267 };
1268 let Some(to_id) = parse_tree_entity_id_text(&edge_kind.to_node) else {
1269 continue;
1270 };
1271 if nodes.contains(&from_id) || nodes.contains(&to_id) {
1272 edge_ids.insert(entity.id);
1273 }
1274 }
1275 Ok(edge_ids)
1276 }
1277
1278 fn delete_entity_internal(&self, collection: &str, id: EntityId) -> RedDBResult<bool> {
1279 let store = self.inner.db.store();
1280 let deleted = store
1281 .delete(collection, id)
1282 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1283 if deleted {
1284 store.context_index().remove_entity(id);
1285 self.cdc_emit(
1286 crate::replication::cdc::ChangeOperation::Delete,
1287 collection,
1288 id.raw(),
1289 "entity",
1290 );
1291 }
1292 Ok(deleted)
1293 }
1294
1295 fn plan_tree_rebalance(
1296 &self,
1297 definition: &crate::physical::PhysicalTreeDefinition,
1298 state: &TreeState,
1299 ) -> RedDBResult<Vec<TreeRebalancePlanRow>> {
1300 let validation = self.validate_tree_state(definition, state);
1301 let root_id = EntityId::new(definition.root_id);
1302 if !validation.depths.contains_key(&root_id) {
1303 return Err(RedDBError::Query(format!(
1304 "tree '{}.{}' is missing its root '{}'",
1305 definition.collection, definition.name, definition.root_id
1306 )));
1307 }
1308
1309 let ordered = flatten_tree_preorder(state, root_id);
1310 let mut assignment_queue = VecDeque::new();
1311 assignment_queue.push_back(root_id);
1312 let mut assigned_children_count: BTreeMap<EntityId, usize> = BTreeMap::new();
1313 let mut new_depths = BTreeMap::new();
1314 new_depths.insert(root_id, 0usize);
1315
1316 let mut plan = Vec::with_capacity(ordered.len());
1317 for node_id in ordered {
1318 loop {
1319 let Some(candidate_parent) = assignment_queue.front().copied() else {
1320 return Err(RedDBError::Query(format!(
1321 "tree '{}.{}' does not have enough aggregate capacity to rebalance",
1322 definition.collection, definition.name
1323 )));
1324 };
1325 let assigned = assigned_children_count
1326 .get(&candidate_parent)
1327 .copied()
1328 .unwrap_or(0);
1329 let limit = effective_max_children(definition, state, candidate_parent)?;
1330 if assigned < limit {
1331 break;
1332 }
1333 assignment_queue.pop_front();
1334 }
1335
1336 let parent_id = assignment_queue.front().copied().ok_or_else(|| {
1337 RedDBError::Query(format!(
1338 "tree '{}.{}' does not have enough aggregate capacity to rebalance",
1339 definition.collection, definition.name
1340 ))
1341 })?;
1342 let entry = assigned_children_count.entry(parent_id).or_insert(0);
1343 let new_index = *entry;
1344 *entry += 1;
1345 let parent_depth = new_depths.get(&parent_id).copied().unwrap_or(0);
1346 new_depths.insert(node_id, parent_depth + 1);
1347 assignment_queue.push_back(node_id);
1348
1349 let old_link = state.parent_by_child.get(&node_id).ok_or_else(|| {
1350 RedDBError::Query(format!(
1351 "node '{}' is missing a structural parent in tree '{}.{}'",
1352 node_id.raw(),
1353 definition.collection,
1354 definition.name
1355 ))
1356 })?;
1357 let old_depth = validation.depths.get(&node_id).copied().unwrap_or(0);
1358 let new_depth = new_depths.get(&node_id).copied().unwrap_or(0);
1359 let changed = old_link.parent_id != parent_id
1360 || old_link.child_index != new_index
1361 || old_depth != new_depth;
1362
1363 plan.push(TreeRebalancePlanRow {
1364 node_id,
1365 old_parent_id: old_link.parent_id,
1366 new_parent_id: parent_id,
1367 old_index: old_link.child_index,
1368 new_index,
1369 old_depth,
1370 new_depth,
1371 changed,
1372 });
1373 }
1374
1375 Ok(plan)
1376 }
1377}
1378
1379fn current_tree_unix_ms() -> u128 {
1380 SystemTime::now()
1381 .duration_since(UNIX_EPOCH)
1382 .unwrap_or_default()
1383 .as_millis()
1384}
1385
1386fn build_tree_node_metadata(
1387 tree_name: &str,
1388 metadata_entries: &[(String, Value)],
1389) -> RedDBResult<Vec<(String, MetadataValue)>> {
1390 let mut metadata = Vec::with_capacity(metadata_entries.len() + 1);
1391 metadata.push((
1392 TREE_METADATA_NAME.to_string(),
1393 MetadataValue::String(tree_name.to_string()),
1394 ));
1395 for (key, value) in metadata_entries {
1396 metadata.push((key.clone(), storage_value_to_tree_metadata(value)?));
1397 }
1398 Ok(metadata)
1399}
1400
1401fn storage_value_to_tree_metadata(value: &Value) -> RedDBResult<MetadataValue> {
1402 Ok(match value {
1403 Value::Null => MetadataValue::Null,
1404 Value::Boolean(value) => MetadataValue::Bool(*value),
1405 Value::Integer(value) => MetadataValue::Int(*value),
1406 Value::UnsignedInteger(value) => {
1407 if *value <= i64::MAX as u64 {
1408 MetadataValue::Int(*value as i64)
1409 } else {
1410 MetadataValue::Timestamp(*value)
1411 }
1412 }
1413 Value::Float(value) => MetadataValue::Float(*value),
1414 Value::Text(value) => MetadataValue::String(value.to_string()),
1415 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
1416 Value::Timestamp(value) => {
1417 if *value < 0 {
1418 return Err(RedDBError::Query(
1419 "negative timestamps are not supported in tree metadata".to_string(),
1420 ));
1421 }
1422 MetadataValue::Timestamp(*value as u64)
1423 }
1424 Value::Json(bytes) => {
1425 let json = crate::json::from_slice::<crate::json::Value>(bytes).map_err(|err| {
1426 RedDBError::Query(format!("failed to decode JSON metadata value: {err}"))
1427 })?;
1428 json_to_metadata_value(&json)?
1429 }
1430 Value::Vector(values) => MetadataValue::Array(
1431 values
1432 .iter()
1433 .map(|value| MetadataValue::Float(*value as f64))
1434 .collect(),
1435 ),
1436 Value::Array(values) => MetadataValue::Array(
1437 values
1438 .iter()
1439 .map(storage_value_to_tree_metadata)
1440 .collect::<RedDBResult<Vec<_>>>()?,
1441 ),
1442 other => MetadataValue::String(format!("{other:?}")),
1443 })
1444}
1445
1446fn resolve_tree_insert_position(position: TreePosition, len: usize) -> RedDBResult<usize> {
1447 match position {
1448 TreePosition::First => Ok(0),
1449 TreePosition::Last => Ok(len),
1450 TreePosition::Index(index) if index <= len => Ok(index),
1451 TreePosition::Index(index) => Err(RedDBError::Query(format!(
1452 "tree child position {} is out of bounds for {} children",
1453 index, len
1454 ))),
1455 }
1456}
1457
1458fn child_id_list(
1459 children_by_parent: &BTreeMap<EntityId, Vec<TreeChildLink>>,
1460 parent_id: EntityId,
1461) -> Vec<EntityId> {
1462 children_by_parent
1463 .get(&parent_id)
1464 .map(|links| links.iter().map(|link| link.child_id).collect())
1465 .unwrap_or_default()
1466}
1467
1468fn subtree_ids(state: &TreeState, root_id: EntityId) -> BTreeSet<EntityId> {
1469 let mut ids = BTreeSet::new();
1470 let mut queue = VecDeque::new();
1471 queue.push_back(root_id);
1472 while let Some(node_id) = queue.pop_front() {
1473 if !ids.insert(node_id) {
1474 continue;
1475 }
1476 if let Some(children) = state.children_by_parent.get(&node_id) {
1477 for child in children {
1478 queue.push_back(child.child_id);
1479 }
1480 }
1481 }
1482 ids
1483}
1484
1485fn flatten_tree_preorder(state: &TreeState, root_id: EntityId) -> Vec<EntityId> {
1486 let mut ordered = Vec::new();
1487 flatten_tree_preorder_inner(state, root_id, &mut ordered);
1488 ordered
1489}
1490
1491fn flatten_tree_preorder_inner(state: &TreeState, current: EntityId, ordered: &mut Vec<EntityId>) {
1492 if let Some(children) = state.children_by_parent.get(¤t) {
1493 for child in children {
1494 ordered.push(child.child_id);
1495 flatten_tree_preorder_inner(state, child.child_id, ordered);
1496 }
1497 }
1498}
1499
1500fn validate_tree_cycles(
1501 node_id: EntityId,
1502 depth: usize,
1503 state: &TreeState,
1504 visited: &mut BTreeSet<EntityId>,
1505 stack: &mut BTreeSet<EntityId>,
1506 depths: &mut BTreeMap<EntityId, usize>,
1507 issues: &mut Vec<TreeIssue>,
1508) {
1509 if stack.contains(&node_id) {
1510 issues.push(TreeIssue {
1511 code: "cycle_detected",
1512 message: format!("cycle detected at node '{}'", node_id.raw()),
1513 entity_id: Some(node_id),
1514 });
1515 return;
1516 }
1517 if !visited.insert(node_id) {
1518 return;
1519 }
1520 stack.insert(node_id);
1521 depths.insert(node_id, depth);
1522 if let Some(children) = state.children_by_parent.get(&node_id) {
1523 for child in children {
1524 validate_tree_cycles(
1525 child.child_id,
1526 depth + 1,
1527 state,
1528 visited,
1529 stack,
1530 depths,
1531 issues,
1532 );
1533 }
1534 }
1535 stack.remove(&node_id);
1536}
1537
1538fn effective_max_children(
1539 definition: &crate::physical::PhysicalTreeDefinition,
1540 state: &TreeState,
1541 node_id: EntityId,
1542) -> RedDBResult<usize> {
1543 let Some(node) = state.nodes.get(&node_id) else {
1544 return Err(RedDBError::NotFound(format!(
1545 "node '{}' was not found in tree '{}.{}'",
1546 node_id.raw(),
1547 definition.collection,
1548 definition.name
1549 )));
1550 };
1551 if let Some(value) = node.max_children_override {
1552 if value == 0 {
1553 return Err(RedDBError::Query(format!(
1554 "node '{}' has non-positive max children override",
1555 node_id.raw()
1556 )));
1557 }
1558 return Ok(value);
1559 }
1560 Ok(definition.default_max_children)
1561}
1562
1563fn metadata_tree_name(metadata: &Metadata) -> Option<&str> {
1564 match metadata.get(TREE_METADATA_NAME) {
1565 Some(MetadataValue::String(value)) => Some(value.as_str()),
1566 _ => None,
1567 }
1568}
1569
1570fn metadata_usize(metadata: &Metadata, key: &str) -> Option<usize> {
1571 match metadata.get(key) {
1572 Some(MetadataValue::Int(value)) if *value >= 0 => Some(*value as usize),
1573 Some(MetadataValue::Timestamp(value)) => (*value).try_into().ok(),
1574 _ => None,
1575 }
1576}
1577
1578fn metadata_invalid_max_children(metadata: &Metadata) -> bool {
1579 match metadata.get(TREE_METADATA_MAX_CHILDREN) {
1580 None => false,
1581 Some(MetadataValue::Int(value)) => *value <= 0,
1582 Some(MetadataValue::Timestamp(value)) => *value == 0,
1583 _ => true,
1584 }
1585}
1586
1587fn parse_tree_entity_id_text(value: &str) -> Option<EntityId> {
1588 value
1589 .strip_prefix('e')
1590 .unwrap_or(value)
1591 .parse::<u64>()
1592 .ok()
1593 .filter(|value| *value > 0)
1594 .map(EntityId::new)
1595}