Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2
3use teaql_core::{
4    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
5    SelectQuery, UpdateCommand, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{
10    CommentTrack, GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11    RuntimeError, ScopedCommentNode, sorted_update_fields,
12};
13
14use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
15
16impl<'a, D, E> ResolvedRepository<'a, D, E>
17where
18    D: SqlDialect,
19    E: QueryExecutor,
20{
21    pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22        if node.entity != self.entity {
23            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24                "resolved repository {} cannot save graph root {}",
25                self.entity, node.entity
26            ))));
27        }
28        let boundary = self
29            .repository
30            .executor
31            .begin_transaction()
32            .map_err(RepositoryError::Executor)?;
33        if matches!(boundary, GraphTransactionBoundary::Unsupported) {
34            return Err(RepositoryError::Runtime(RuntimeError::Graph(
35                "save_graph requires a transactional executor".to_owned(),
36            )));
37        }
38        let result = self.upsert_graph_node_scoped(node, None);
39        match result {
40            Ok(saved) => {
41                if matches!(boundary, GraphTransactionBoundary::Started) {
42                    self.repository
43                        .executor
44                        .commit_transaction()
45                        .map_err(RepositoryError::Executor)?;
46                }
47                Ok(saved)
48            }
49            Err(err) => {
50                if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
51                    self.repository
52                        .executor
53                        .rollback_transaction()
54                        .map_err(RepositoryError::Executor)?;
55                }
56                Err(err)
57            }
58        }
59    }
60
61
62    pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
63    where
64        T: Entity,
65    {
66        let node = self
67            .graph_node_from_entity(entity)
68            .map_err(RepositoryError::Runtime)?;
69        self.save_graph(node)
70    }
71
72    pub fn save_entity_graph_with_comment<T>(
73        &self,
74        entity: T,
75        comment: impl Into<String>,
76    ) -> Result<GraphNode, RepositoryError<E::Error>>
77    where
78        T: Entity,
79    {
80        let mut node = self
81            .graph_node_from_entity(entity)
82            .map_err(RepositoryError::Runtime)?;
83        node.set_comment(comment);
84        self.save_graph(node)
85    }
86
87    /// Create a new entity graph with an annotation comment on the root node.
88    /// This assumes all new nodes do not exist in the database, skipping existence checks
89    /// and throwing an exception on primary key conflict.
90    pub fn create_entity_graph_with_comment<T>(
91        &self,
92        entity: T,
93        comment: impl Into<String>,
94    ) -> Result<GraphNode, RepositoryError<E::Error>>
95    where
96        T: Entity,
97    {
98        let mut node = self
99            .graph_node_from_entity(entity)
100            .map_err(RepositoryError::Runtime)?;
101        node.operation = GraphOperation::Create;
102        node.set_comment(comment);
103        self.save_graph(node)
104    }
105
106    pub fn plan_graph(
107        &self,
108        node: GraphNode,
109    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
110        if node.entity != self.entity {
111            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
112                "resolved repository {} cannot plan graph root {}",
113                self.entity, node.entity
114            ))));
115        }
116        let mut node = node;
117        let mut plan = GraphMutationPlan::default();
118        self.collect_graph_plan(&mut node, &mut plan, None, false)?;
119        plan.planned_root = Some(node);
120        plan.rebuild_batches();
121        Ok(plan)
122    }
123
124    pub fn execute_graph_plan(
125        &self,
126        plan: GraphMutationPlan,
127    ) -> Result<GraphNode, RepositoryError<E::Error>> {
128        let Some(root) = plan.planned_root else {
129            return Err(RepositoryError::Runtime(RuntimeError::Graph(
130                "graph mutation plan has no planned root".to_owned(),
131            )));
132        };
133
134        self.upsert_graph_node_scoped(root, None)
135    }
136
137    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
138    where
139        T: Entity,
140    {
141        let descriptor = T::entity_descriptor();
142        if descriptor.name != self.entity {
143            return Err(RuntimeError::Graph(format!(
144                "resolved repository {} cannot extract graph root {}",
145                self.entity, descriptor.name
146            )));
147        }
148        self.graph_node_from_record(&descriptor.name, entity.into_record())
149    }
150
151    fn collect_graph_plan<'s>(
152        &self,
153        node: &mut GraphNode,
154        plan: &mut GraphMutationPlan,
155        parent_scope: Option<&'s ScopedCommentNode<'s>>,
156        parent_is_create: bool,
157    ) -> Result<(), RepositoryError<E::Error>> {
158        match node.operation {
159            GraphOperation::Reference => {
160                plan.push(
161                    node.entity.clone(),
162                    GraphMutationKind::Reference,
163                    node.values.clone(),
164                    Vec::new(),
165                );
166                return Ok(());
167            }
168            GraphOperation::Remove => {
169                plan.push(
170                    node.entity.clone(),
171                    GraphMutationKind::Delete,
172                    node.values.clone(),
173                    Vec::new(),
174                );
175                return Ok(());
176            }
177            GraphOperation::Upsert | GraphOperation::Create => {}
178        }
179
180        let descriptor = self
181            .repository
182            .metadata
183            .context
184            .require_entity(&node.entity)
185            .map_err(RepositoryError::Runtime)?;
186
187        // Create scope node on the current stack frame if this node has a comment
188        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
189            parent: parent_scope,
190            track: CommentTrack {
191                entity_type: node.entity.clone(),
192                entity_id: node
193                    .id()
194                    .map(|v| match v {
195                        Value::U64(n) => n.to_string(),
196                        Value::I64(n) => n.to_string(),
197                        Value::Text(s) => s.clone(),
198                        other => format!("{other:?}"),
199                    })
200                    .unwrap_or_else(|| "(pending)".into()),
201                comment: c.clone(),
202            },
203        });
204        let active_scope = current_scope.as_ref().or(parent_scope);
205
206        let id_property = descriptor.id_property().cloned();
207        let id = id_property.as_ref().and_then(|property| {
208            node.values
209                .get(&property.name)
210                .filter(|value| !is_unassigned_id_value(value))
211                .cloned()
212        });
213
214        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
215
216        let is_update = if is_create_op {
217            false
218        } else {
219            match (id_property.as_ref(), id.as_ref()) {
220                (Some(id_property), Some(id)) => self
221                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_lineage_string()))?
222                    .is_some(),
223                _ => false,
224            }
225        };
226        if !is_update {
227            if let Some(id_property) = id_property.as_ref() {
228                let needs_id = !node.values.contains_key(&id_property.name)
229                    || node
230                        .values
231                        .get(&id_property.name)
232                        .is_some_and(is_unassigned_id_value);
233                if needs_id {
234                    let id = self
235                        .repository
236                        .metadata
237                        .context
238                        .next_id(&node.entity)
239                        .map_err(RepositoryError::Runtime)?;
240                    node.values.insert(id_property.name.clone(), Value::U64(id));
241                }
242            }
243            ensure_initial_version(&mut node.values, descriptor);
244        }
245        let update_fields = if is_update {
246            let mut excluded = Vec::new();
247            if let Some(id_property) = id_property.as_ref() {
248                excluded.push(id_property.name.clone());
249            }
250            if let Some(version_property) = descriptor.version_property() {
251                excluded.push(version_property.name.clone());
252            }
253            sorted_update_fields(&node.values, excluded)
254        } else {
255            Vec::new()
256        };
257        plan.push(
258            node.entity.clone(),
259            if is_update {
260                GraphMutationKind::Update
261            } else {
262                GraphMutationKind::Create
263            },
264            node.values.clone(),
265            update_fields,
266        );
267
268        for (name, children) in &mut node.relations {
269            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
270                RepositoryError::Runtime(RuntimeError::MissingRelation {
271                    entity: node.entity.clone(),
272                    relation: name.clone(),
273                })
274            })?;
275            let child_repo = self.scoped_repository(relation.target_entity.clone());
276            for child in children {
277                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
278                child_repo.collect_graph_plan(child, plan, active_scope, is_create_op)?;
279            }
280        }
281        Ok(())
282    }
283
284    fn insert_graph_node_scoped<'s>(
285        &self,
286        mut node: GraphNode,
287        parent_scope: Option<&'s ScopedCommentNode<'s>>,
288    ) -> Result<GraphNode, RepositoryError<E::Error>> {
289        match node.operation {
290            GraphOperation::Upsert | GraphOperation::Create => {}
291            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_lineage_string())),
292            GraphOperation::Remove => {
293                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
294                    "create graph cannot remove node {}",
295                    node.entity
296                ))));
297            }
298        }
299
300        // Create scope node on the current stack frame if this node has a comment
301        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
302            parent: parent_scope,
303            track: CommentTrack {
304                entity_type: node.entity.clone(),
305                entity_id: node
306                    .id()
307                    .map(|v| match v {
308                        Value::U64(n) => n.to_string(),
309                        Value::I64(n) => n.to_string(),
310                        Value::Text(s) => s.clone(),
311                        other => format!("{other:?}"),
312                    })
313                    .unwrap_or_else(|| "(pending)".into()),
314                comment: c.clone(),
315            },
316        });
317        let active_scope = current_scope.as_ref().or(parent_scope);
318
319        let descriptor = self
320            .repository
321            .metadata
322            .context
323            .require_entity(&node.entity)
324            .map_err(RepositoryError::Runtime)?;
325
326        let mut one_relations = Vec::new();
327        let mut many_relations = Vec::new();
328        for (name, children) in std::mem::take(&mut node.relations) {
329            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
330                RepositoryError::Runtime(RuntimeError::MissingRelation {
331                    entity: node.entity.clone(),
332                    relation: name.clone(),
333                })
334            })?;
335            if relation.many {
336                many_relations.push((name, relation.clone(), children));
337            } else {
338                one_relations.push((name, relation.clone(), children));
339            }
340        }
341
342        for (name, relation, children) in one_relations {
343            if children.len() > 1 {
344                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
345                    "relation {}.{} expects one child, got {}",
346                    node.entity,
347                    name,
348                    children.len()
349                ))));
350            }
351            let mut saved_children = Vec::new();
352            for child in children {
353                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
354                let child_repo = self.scoped_repository(child.entity.clone());
355                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
356                if relation.attach {
357                    let foreign_value = saved_child
358                        .values
359                        .get(&relation.foreign_key)
360                        .cloned()
361                        .ok_or_else(|| {
362                            RepositoryError::Runtime(RuntimeError::Graph(format!(
363                                "saved child {} missing foreign key {} for relation {}.{}",
364                                relation.target_entity, relation.foreign_key, node.entity, name
365                            )))
366                        })?;
367                    node.values
368                        .insert(relation.local_key.clone(), foreign_value);
369                }
370                saved_children.push(saved_child);
371            }
372            node.relations.insert(name, saved_children);
373        }
374
375        let command = self
376            .prepare_insert_command(&InsertCommand {
377                entity: node.entity.clone(),
378                values: node.values.clone(),
379            })
380            .map_err(RepositoryError::Runtime)?;
381        let lineage = active_scope.map(|s| s.to_lineage_string());
382        self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
383        node.values = command.values;
384
385        for (name, relation, children) in many_relations {
386            let local_value = node
387                .values
388                .get(&relation.local_key)
389                .cloned()
390                .ok_or_else(|| {
391                    RepositoryError::Runtime(RuntimeError::Graph(format!(
392                        "parent {} missing local key {} for relation {}",
393                        node.entity, relation.local_key, name
394                    )))
395                })?;
396            let mut saved_children = Vec::new();
397            for mut child in children {
398                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
399                if relation.attach {
400                    child
401                        .values
402                        .insert(relation.foreign_key.clone(), local_value.clone());
403                }
404                let child_repo = self.scoped_repository(child.entity.clone());
405                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
406            }
407            node.relations.insert(name, saved_children);
408        }
409
410        Ok(node)
411    }
412
413    fn upsert_graph_node_scoped<'s>(
414        &self,
415        mut node: GraphNode,
416        parent_scope: Option<&'s ScopedCommentNode<'s>>,
417    ) -> Result<GraphNode, RepositoryError<E::Error>> {
418        // Create scope node on the current stack frame if this node has a comment
419        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
420            parent: parent_scope,
421            track: CommentTrack {
422                entity_type: node.entity.clone(),
423                entity_id: node
424                    .id()
425                    .map(|v| match v {
426                        Value::U64(n) => n.to_string(),
427                        Value::I64(n) => n.to_string(),
428                        Value::Text(s) => s.clone(),
429                        other => format!("{other:?}"),
430                    })
431                    .unwrap_or_else(|| "(pending)".into()),
432                comment: c.clone(),
433            },
434        });
435        let active_scope = current_scope.as_ref().or(parent_scope);
436
437        match node.operation {
438            GraphOperation::Upsert | GraphOperation::Create => {}
439            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_lineage_string())),
440            GraphOperation::Remove => {
441                self.validate_remove_node(&node, active_scope.map(|s| s.to_lineage_string()))?;
442                self.delete_graph_node(&node, parent_scope)?;
443                return Ok(node);
444            }
445        }
446
447        let descriptor = self
448            .repository
449            .metadata
450            .context
451            .require_entity(&node.entity)
452            .map_err(RepositoryError::Runtime)?;
453        let Some(id_property) = descriptor.id_property() else {
454            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
455                "entity {} has no id property for graph upsert",
456                node.entity
457            ))));
458        };
459        let Some(id) = node
460            .values
461            .get(&id_property.name)
462            .filter(|value| !is_unassigned_id_value(value))
463            .cloned()
464        else {
465            // Strip comment to prevent duplicate scope — already captured in active_scope
466            node.comment = None;
467            return self.insert_graph_node_scoped(node, active_scope);
468        };
469
470        if node.operation == GraphOperation::Create || self
471            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_lineage_string()))?
472            .is_none()
473        {
474            node.comment = None;
475            return self.insert_graph_node_scoped(node, active_scope);
476        }
477
478        let mut one_relations = Vec::new();
479        let mut many_relations = Vec::new();
480        for (name, children) in std::mem::take(&mut node.relations) {
481            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
482                RepositoryError::Runtime(RuntimeError::MissingRelation {
483                    entity: node.entity.clone(),
484                    relation: name.clone(),
485                })
486            })?;
487            if relation.many {
488                many_relations.push((name, relation.clone(), children));
489            } else {
490                one_relations.push((name, relation.clone(), children));
491            }
492        }
493
494        for (name, relation, children) in one_relations {
495            if children.len() > 1 {
496                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
497                    "relation {}.{} expects one child, got {}",
498                    node.entity,
499                    name,
500                    children.len()
501                ))));
502            }
503            let mut saved_children = Vec::new();
504            for child in children {
505                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
506                let child_repo = self.scoped_repository(child.entity.clone());
507                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
508                if relation.attach {
509                    let foreign_value = saved_child
510                        .values
511                        .get(&relation.foreign_key)
512                        .cloned()
513                        .ok_or_else(|| {
514                            RepositoryError::Runtime(RuntimeError::Graph(format!(
515                                "saved child {} missing foreign key {} for relation {}.{}",
516                                relation.target_entity, relation.foreign_key, node.entity, name
517                            )))
518                        })?;
519                    node.values
520                        .insert(relation.local_key.clone(), foreign_value);
521                }
522                saved_children.push(saved_child);
523            }
524            node.relations.insert(name, saved_children);
525        }
526
527        let update = self.graph_update_command(&node, descriptor, id_property, &id);
528        if !update.values.is_empty() || update.expected_version.is_some() {
529            let prepared_update = self
530                .prepare_update_command(&update)
531                .map_err(RepositoryError::Runtime)?;
532            let lineage = active_scope.map(|s| s.to_lineage_string());
533            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
534            for (field, value) in &prepared_update.values {
535                node.values.insert(field.clone(), value.clone());
536            }
537            if let Some(version_property) = descriptor.version_property() {
538                if let Some(expected_version) = prepared_update.expected_version {
539                    node.values.insert(
540                        version_property.name.clone(),
541                        Value::I64(expected_version + 1),
542                    );
543                }
544            }
545        }
546
547        for (name, relation, children) in many_relations {
548            let local_value = node
549                .values
550                .get(&relation.local_key)
551                .cloned()
552                .ok_or_else(|| {
553                    RepositoryError::Runtime(RuntimeError::Graph(format!(
554                        "parent {} missing local key {} for relation {}",
555                        node.entity, relation.local_key, name
556                    )))
557                })?;
558            let child_repo = self.scoped_repository(relation.target_entity.clone());
559            let existing_children = child_repo.fetch_graph_children(
560                &relation.target_entity,
561                &relation.foreign_key,
562                &local_value,
563                active_scope.map(|s| s.to_lineage_string()),
564            )?;
565            let child_descriptor = self
566                .repository
567                .metadata
568                .context
569                .require_entity(&relation.target_entity)
570                .map_err(RepositoryError::Runtime)?;
571            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
572                RepositoryError::Runtime(RuntimeError::Graph(format!(
573                    "entity {} has no id property for graph diff",
574                    relation.target_entity
575                )))
576            })?;
577            let mut existing_by_id = BTreeMap::new();
578            for child in existing_children {
579                if let Some(id) = child.get(&child_id_property.name) {
580                    existing_by_id.insert(graph_identity_key(id), child);
581                }
582            }
583
584            let mut seen = std::collections::BTreeSet::new();
585            let mut saved_children = Vec::new();
586            for mut child in children {
587                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
588                if relation.attach && child.operation != GraphOperation::Reference {
589                    child
590                        .values
591                        .insert(relation.foreign_key.clone(), local_value.clone());
592                }
593                if let Some(child_id) = child
594                    .values
595                    .get(&child_id_property.name)
596                    .filter(|value| !is_unassigned_id_value(value))
597                {
598                    let key = graph_identity_key(child_id);
599                    if !seen.insert(key.clone()) {
600                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
601                            "duplicate child id {key} in relation {}.{}",
602                            node.entity, name
603                        ))));
604                    }
605                }
606                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
607            }
608
609            if relation.delete_missing {
610                for (id_key, existing) in existing_by_id {
611                    if seen.contains(&id_key) {
612                        continue;
613                    }
614                    let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
615                        continue;
616                    };
617                    let mut delete =
618                        DeleteCommand::new(relation.target_entity.clone(), existing_id);
619                    if let Some(version) = graph_record_version(&existing, child_descriptor) {
620                        delete = delete.expected_version(version);
621                    }
622                    let lineage = active_scope.map(|s| s.to_lineage_string());
623                    child_repo.delete_scoped(&delete, lineage)?;
624                }
625            }
626
627            node.relations.insert(name, saved_children);
628        }
629
630        Ok(node)
631    }
632
633    fn validate_reference_node(
634        &self,
635        node: GraphNode,
636        lineage: Option<String>,
637    ) -> Result<GraphNode, RepositoryError<E::Error>> {
638        if !node.relations.is_empty() {
639            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
640                "reference node {} cannot contain child relations",
641                node.entity
642            ))));
643        }
644        let descriptor = self
645            .repository
646            .metadata
647            .context
648            .require_entity(&node.entity)
649            .map_err(RepositoryError::Runtime)?;
650        let id_property = descriptor.id_property().ok_or_else(|| {
651            RepositoryError::Runtime(RuntimeError::Graph(format!(
652                "entity {} has no id property for graph reference",
653                node.entity
654            )))
655        })?;
656        let id = node
657            .values
658            .get(&id_property.name)
659            .filter(|value| !is_unassigned_id_value(value))
660            .cloned()
661            .ok_or_else(|| {
662                RepositoryError::Runtime(RuntimeError::Graph(format!(
663                    "reference node {} missing id property {}",
664                    node.entity, id_property.name
665                )))
666            })?;
667
668        for field in node.values.keys() {
669            if field == &id_property.name {
670                continue;
671            }
672            if descriptor
673                .version_property()
674                .map(|property| field == &property.name)
675                .unwrap_or(false)
676            {
677                continue;
678            }
679            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
680                "reference node {} cannot carry mutable field {}",
681                node.entity, field
682            ))));
683        }
684
685        let current = self
686            .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
687            .ok_or_else(|| {
688                RepositoryError::Runtime(RuntimeError::Graph(format!(
689                    "reference node {}({}) does not exist",
690                    node.entity,
691                    graph_identity_key(&id)
692                )))
693            })?;
694
695        if let Some(version_property) = descriptor.version_property() {
696            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
697                if *existing_version < 0 {
698                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
699                        "reference node {}({}) is deleted",
700                        node.entity,
701                        graph_identity_key(&id)
702                    ))));
703                }
704                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
705                {
706                    if expected_version != existing_version {
707                        return Err(RepositoryError::Runtime(
708                            RuntimeError::OptimisticLockConflict {
709                                entity: node.entity,
710                                id: graph_identity_key(&id),
711                            },
712                        ));
713                    }
714                }
715            }
716        }
717
718        Ok(GraphNode {
719            entity: node.entity,
720            values: current,
721            relations: BTreeMap::new(),
722            operation: GraphOperation::Reference,
723            comment: None,
724        })
725    }
726
727    fn validate_remove_node(&self, node: &GraphNode, lineage: Option<String>) -> Result<(), RepositoryError<E::Error>> {
728        if !node.relations.is_empty() {
729            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
730                "remove node {} cannot contain child relations",
731                node.entity
732            ))));
733        }
734        let descriptor = self
735            .repository
736            .metadata
737            .context
738            .require_entity(&node.entity)
739            .map_err(RepositoryError::Runtime)?;
740        let id_property = descriptor.id_property().ok_or_else(|| {
741            RepositoryError::Runtime(RuntimeError::Graph(format!(
742                "entity {} has no id property for graph remove",
743                node.entity
744            )))
745        })?;
746        let id = node
747            .values
748            .get(&id_property.name)
749            .filter(|value| !is_unassigned_id_value(value))
750            .cloned()
751            .ok_or_else(|| {
752                RepositoryError::Runtime(RuntimeError::Graph(format!(
753                    "remove node {} missing id property {}",
754                    node.entity, id_property.name
755                )))
756            })?;
757        let current = self
758            .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
759            .ok_or_else(|| {
760                RepositoryError::Runtime(RuntimeError::Graph(format!(
761                    "remove node {}({}) does not exist",
762                    node.entity,
763                    graph_identity_key(&id)
764                )))
765            })?;
766        if let Some(version_property) = descriptor.version_property() {
767            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
768                if *existing_version < 0 {
769                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
770                        "remove node {}({}) is already deleted",
771                        node.entity,
772                        graph_identity_key(&id)
773                    ))));
774                }
775            }
776        }
777        Ok(())
778    }
779
780    fn graph_node_from_record(
781        &self,
782        entity: &str,
783        record: Record,
784    ) -> Result<GraphNode, RuntimeError> {
785        let descriptor = self.repository.metadata.context.require_entity(entity)?;
786        let mut node = GraphNode::new(entity);
787
788        for (field, value) in record {
789            if field == "_comment" {
790                if let Value::Text(comment) = value {
791                    node.set_comment(comment);
792                }
793                continue;
794            }
795            let Some(relation) = descriptor.relation_by_name(&field) else {
796                node.values.insert(field, value);
797                continue;
798            };
799
800            match value {
801                Value::Null => {
802                    node.relations.entry(field).or_default();
803                }
804                Value::Object(record) => {
805                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
806                    node.relations.entry(field).or_default().push(child);
807                }
808                Value::List(values) => {
809                    let children = node.relations.entry(field.clone()).or_default();
810                    for value in values {
811                        let Value::Object(record) = value else {
812                            return Err(RuntimeError::Graph(format!(
813                                "relation {}.{} expects object children, got {:?}",
814                                entity, field, value
815                            )));
816                        };
817                        children
818                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
819                    }
820                }
821                other => {
822                    return Err(RuntimeError::Graph(format!(
823                        "relation {}.{} expects object/list/null, got {:?}",
824                        entity, field, other
825                    )));
826                }
827            }
828        }
829
830        Ok(node)
831    }
832
833    fn graph_update_command(
834        &self,
835        node: &GraphNode,
836        descriptor: &EntityDescriptor,
837        id_property: &PropertyDescriptor,
838        id: &Value,
839    ) -> UpdateCommand {
840        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
841        if let Some(version_property) = descriptor.version_property() {
842            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
843                command = command.expected_version(*version);
844            }
845        }
846        for property in descriptor.properties.iter().filter(|property| {
847            !property.is_id && !property.is_version && node.values.contains_key(&property.name)
848        }) {
849            if property.name == id_property.name {
850                continue;
851            }
852            if let Some(value) = node.values.get(&property.name) {
853                command.values.insert(property.name.clone(), value.clone());
854            }
855        }
856        command
857    }
858
859    fn delete_graph_node<'s>(
860        &self,
861        node: &GraphNode,
862        parent_scope: Option<&'s ScopedCommentNode<'s>>,
863    ) -> Result<u64, RepositoryError<E::Error>> {
864        let descriptor = self
865            .repository
866            .metadata
867            .context
868            .require_entity(&node.entity)
869            .map_err(RepositoryError::Runtime)?;
870        let id_property = descriptor.id_property().ok_or_else(|| {
871            RepositoryError::Runtime(RuntimeError::Graph(format!(
872                "entity {} has no id property for graph remove",
873                node.entity
874            )))
875        })?;
876        let id = node
877            .values
878            .get(&id_property.name)
879            .filter(|value| !is_unassigned_id_value(value))
880            .cloned()
881            .ok_or_else(|| {
882                RepositoryError::Runtime(RuntimeError::Graph(format!(
883                    "remove node {} missing id property {}",
884                    node.entity, id_property.name
885                )))
886            })?;
887        let mut delete = DeleteCommand::new(node.entity.clone(), id);
888        if let Some(version_property) = descriptor.version_property() {
889            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
890                delete = delete.expected_version(*version);
891            }
892        }
893
894        // Create scope node for deletion if parent/node comment is present
895        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
896            parent: parent_scope,
897            track: CommentTrack {
898                entity_type: node.entity.clone(),
899                entity_id: node
900                    .id()
901                    .map(|v| match v {
902                        Value::U64(n) => n.to_string(),
903                        Value::I64(n) => n.to_string(),
904                        Value::Text(s) => s.clone(),
905                        other => format!("{other:?}"),
906                    })
907                    .unwrap_or_else(|| "(pending)".into()),
908                comment: c.clone(),
909            },
910        });
911        let active_scope = current_scope.as_ref().or(parent_scope);
912        let lineage = active_scope.map(|s| s.to_lineage_string());
913
914        self.delete_scoped(&delete, lineage)
915    }
916
917    fn fetch_graph_current_row(
918        &self,
919        entity: &str,
920        id_property: &str,
921        id: &Value,
922        lineage: Option<String>,
923    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
924        let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
925        if let Some(lineage) = lineage {
926            query = query.comment(lineage);
927        }
928        let mut rows = self
929            .scoped_repository(entity.to_owned())
930            .fetch_all(&query)?;
931        Ok(rows.pop())
932    }
933
934    fn fetch_graph_children(
935        &self,
936        entity: &str,
937        foreign_key: &str,
938        parent_value: &Value,
939        lineage: Option<String>,
940    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
941        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
942        if let Some(lineage) = lineage {
943            query = query.comment(lineage);
944        }
945        self.scoped_repository(entity.to_owned()).fetch_all(&query)
946    }
947}