Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2
3use teaql_core::{
4    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
5    SelectQuery, UpdateCommand, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{
10    CommentTrack, GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11    RuntimeError, ScopedCommentNode, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
16
17impl<'a, D, E> ResolvedRepository<'a, D, E>
18where
19    D: SqlDialect,
20    E: QueryExecutor,
21{
22    pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
23        if node.entity != self.entity {
24            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
25                "resolved repository {} cannot save graph root {}",
26                self.entity, node.entity
27            ))));
28        }
29        let boundary = self
30            .repository
31            .executor
32            .begin_transaction()
33            .map_err(RepositoryError::Executor)?;
34        if matches!(boundary, GraphTransactionBoundary::Unsupported) {
35            return Err(RepositoryError::Runtime(RuntimeError::Graph(
36                "save_graph requires a transactional executor".to_owned(),
37            )));
38        }
39        let result = self.upsert_graph_node_scoped(node, None);
40        match result {
41            Ok(saved) => {
42                if matches!(boundary, GraphTransactionBoundary::Started) {
43                    self.repository
44                        .executor
45                        .commit_transaction()
46                        .map_err(RepositoryError::Executor)?;
47                }
48                Ok(saved)
49            }
50            Err(err) => {
51                if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
52                    self.repository
53                        .executor
54                        .rollback_transaction()
55                        .map_err(RepositoryError::Executor)?;
56                }
57                Err(err)
58            }
59        }
60    }
61
62
63    pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
64    where
65        T: Entity,
66    {
67        let node = self
68            .graph_node_from_entity(entity)
69            .map_err(RepositoryError::Runtime)?;
70        self.save_graph(node)
71    }
72
73    pub fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
74    where
75        T: Entity,
76    {
77        if !status.need_persist() {
78            return Ok(GraphNode::new(&self.entity));
79        }
80        if status.is_deleted() {
81            let mut node = self.graph_node_from_entity(entity)
82                .map_err(RepositoryError::Runtime)?;
83            node.operation = GraphOperation::Remove;
84            node.relations.clear();
85            self.save_graph(node)
86        } else {
87            self.save_entity_graph(entity)
88        }
89    }
90    pub fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
91    where
92        T: Entity,
93    {
94        if status.is_deleted() {
95            let mut node = self.graph_node_from_entity(entity)
96                .map_err(RepositoryError::Runtime)?;
97            node.operation = GraphOperation::Remove;
98            node.relations.clear();
99            node.set_comment(comment);
100            self.save_graph(node)
101        } else {
102            self.save_entity_graph_with_comment(entity, comment)
103        }
104    }
105    pub fn save_entity_graph_with_comment<T>(
106        &self,
107        entity: T,
108        comment: impl Into<String>,
109    ) -> Result<GraphNode, RepositoryError<E::Error>>
110    where
111        T: Entity,
112    {
113        let mut node = self
114            .graph_node_from_entity(entity)
115            .map_err(RepositoryError::Runtime)?;
116        node.set_comment(comment);
117        self.save_graph(node)
118    }
119
120    /// Create a new entity graph with an annotation comment on the root node.
121    /// This assumes all new nodes do not exist in the database, skipping existence checks
122    /// and throwing an exception on primary key conflict.
123    pub fn create_entity_graph_with_comment<T>(
124        &self,
125        entity: T,
126        comment: impl Into<String>,
127    ) -> Result<GraphNode, RepositoryError<E::Error>>
128    where
129        T: Entity,
130    {
131        let mut node = self
132            .graph_node_from_entity(entity)
133            .map_err(RepositoryError::Runtime)?;
134        node.operation = GraphOperation::Create;
135        node.set_comment(comment);
136        self.save_graph(node)
137    }
138
139    pub fn plan_graph(
140        &self,
141        node: GraphNode,
142    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
143        if node.entity != self.entity {
144            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
145                "resolved repository {} cannot plan graph root {}",
146                self.entity, node.entity
147            ))));
148        }
149        let mut node = node;
150        let mut plan = GraphMutationPlan::default();
151        self.collect_graph_plan(&mut node, &mut plan, None, false)?;
152        plan.planned_root = Some(node);
153        plan.rebuild_batches();
154        Ok(plan)
155    }
156
157    pub fn execute_graph_plan(
158        &self,
159        plan: GraphMutationPlan,
160    ) -> Result<GraphNode, RepositoryError<E::Error>> {
161        let Some(root) = plan.planned_root else {
162            return Err(RepositoryError::Runtime(RuntimeError::Graph(
163                "graph mutation plan has no planned root".to_owned(),
164            )));
165        };
166
167        self.upsert_graph_node_scoped(root, None)
168    }
169
170    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
171    where
172        T: Entity,
173    {
174        let descriptor = T::entity_descriptor();
175        if descriptor.name != self.entity {
176            return Err(RuntimeError::Graph(format!(
177                "resolved repository {} cannot extract graph root {}",
178                self.entity, descriptor.name
179            )));
180        }
181        self.graph_node_from_record(&descriptor.name, entity.into_record())
182    }
183
184    fn collect_graph_plan<'s>(
185        &self,
186        node: &mut GraphNode,
187        plan: &mut GraphMutationPlan,
188        parent_scope: Option<&'s ScopedCommentNode<'s>>,
189        parent_is_create: bool,
190    ) -> Result<(), RepositoryError<E::Error>> {
191        match node.operation {
192            GraphOperation::Reference => {
193                plan.push(
194                    node.entity.clone(),
195                    GraphMutationKind::Reference,
196                    node.values.clone(),
197                    Vec::new(),
198                );
199                return Ok(());
200            }
201            GraphOperation::Remove => {
202                plan.push(
203                    node.entity.clone(),
204                    GraphMutationKind::Delete,
205                    node.values.clone(),
206                    Vec::new(),
207                );
208                return Ok(());
209            }
210            GraphOperation::Upsert | GraphOperation::Create => {}
211        }
212
213        let descriptor = self
214            .repository
215            .metadata
216            .context
217            .require_entity(&node.entity)
218            .map_err(RepositoryError::Runtime)?;
219
220        // Create scope node on the current stack frame if this node has a comment
221        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
222            parent: parent_scope,
223            track: CommentTrack {
224                entity_type: node.entity.clone(),
225                entity_id: node
226                    .id()
227                    .map(|v| match v {
228                        Value::U64(n) => n.to_string(),
229                        Value::I64(n) => n.to_string(),
230                        Value::Text(s) => s.clone(),
231                        other => format!("{other:?}"),
232                    })
233                    .unwrap_or_else(|| "(pending)".into()),
234                comment: c.clone(),
235            },
236        });
237        let active_scope = current_scope.as_ref().or(parent_scope);
238
239        let id_property = descriptor.id_property().cloned();
240        let id = id_property.as_ref().and_then(|property| {
241            node.values
242                .get(&property.name)
243                .filter(|value| !is_unassigned_id_value(value))
244                .cloned()
245        });
246
247        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
248
249        let is_update = if is_create_op {
250            false
251        } else {
252            match (id_property.as_ref(), id.as_ref()) {
253                (Some(id_property), Some(id)) => self
254                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_lineage_string()))?
255                    .is_some(),
256                _ => false,
257            }
258        };
259        if !is_update {
260            if let Some(id_property) = id_property.as_ref() {
261                let needs_id = !node.values.contains_key(&id_property.name)
262                    || node
263                        .values
264                        .get(&id_property.name)
265                        .is_some_and(is_unassigned_id_value);
266                if needs_id {
267                    let id = self
268                        .repository
269                        .metadata
270                        .context
271                        .next_id(&node.entity)
272                        .map_err(RepositoryError::Runtime)?;
273                    node.values.insert(id_property.name.clone(), Value::U64(id));
274                }
275            }
276            ensure_initial_version(&mut node.values, descriptor);
277        }
278        let update_fields = if is_update {
279            let mut excluded = Vec::new();
280            if let Some(id_property) = id_property.as_ref() {
281                excluded.push(id_property.name.clone());
282            }
283            if let Some(version_property) = descriptor.version_property() {
284                excluded.push(version_property.name.clone());
285            }
286            sorted_update_fields(&node.values, excluded)
287        } else {
288            Vec::new()
289        };
290        plan.push(
291            node.entity.clone(),
292            if is_update {
293                GraphMutationKind::Update
294            } else {
295                GraphMutationKind::Create
296            },
297            node.values.clone(),
298            update_fields,
299        );
300
301        for (name, children) in &mut node.relations {
302            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
303                RepositoryError::Runtime(RuntimeError::MissingRelation {
304                    entity: node.entity.clone(),
305                    relation: name.clone(),
306                })
307            })?;
308            let child_repo = self.scoped_repository(relation.target_entity.clone());
309            for child in children {
310                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
311                child_repo.collect_graph_plan(child, plan, active_scope, is_create_op)?;
312            }
313        }
314        Ok(())
315    }
316
317    fn insert_graph_node_scoped<'s>(
318        &self,
319        mut node: GraphNode,
320        parent_scope: Option<&'s ScopedCommentNode<'s>>,
321    ) -> Result<GraphNode, RepositoryError<E::Error>> {
322        match node.operation {
323            GraphOperation::Upsert | GraphOperation::Create => {}
324            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_lineage_string())),
325            GraphOperation::Remove => {
326                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
327                    "create graph cannot remove node {}",
328                    node.entity
329                ))));
330            }
331        }
332
333        // Create scope node on the current stack frame if this node has a comment
334        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
335            parent: parent_scope,
336            track: CommentTrack {
337                entity_type: node.entity.clone(),
338                entity_id: node
339                    .id()
340                    .map(|v| match v {
341                        Value::U64(n) => n.to_string(),
342                        Value::I64(n) => n.to_string(),
343                        Value::Text(s) => s.clone(),
344                        other => format!("{other:?}"),
345                    })
346                    .unwrap_or_else(|| "(pending)".into()),
347                comment: c.clone(),
348            },
349        });
350        let active_scope = current_scope.as_ref().or(parent_scope);
351
352        let descriptor = self
353            .repository
354            .metadata
355            .context
356            .require_entity(&node.entity)
357            .map_err(RepositoryError::Runtime)?;
358
359        let mut one_relations = Vec::new();
360        let mut many_relations = Vec::new();
361        for (name, children) in std::mem::take(&mut node.relations) {
362            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
363                RepositoryError::Runtime(RuntimeError::MissingRelation {
364                    entity: node.entity.clone(),
365                    relation: name.clone(),
366                })
367            })?;
368            if relation.many {
369                many_relations.push((name, relation.clone(), children));
370            } else {
371                one_relations.push((name, relation.clone(), children));
372            }
373        }
374
375        for (name, relation, children) in one_relations {
376            if children.len() > 1 {
377                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
378                    "relation {}.{} expects one child, got {}",
379                    node.entity,
380                    name,
381                    children.len()
382                ))));
383            }
384            let mut saved_children = Vec::new();
385            for child in children {
386                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
387                let child_repo = self.scoped_repository(child.entity.clone());
388                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
389                if relation.attach {
390                    let foreign_value = saved_child
391                        .values
392                        .get(&relation.foreign_key)
393                        .cloned()
394                        .ok_or_else(|| {
395                            RepositoryError::Runtime(RuntimeError::Graph(format!(
396                                "saved child {} missing foreign key {} for relation {}.{}",
397                                relation.target_entity, relation.foreign_key, node.entity, name
398                            )))
399                        })?;
400                    node.values
401                        .insert(relation.local_key.clone(), foreign_value);
402                }
403                saved_children.push(saved_child);
404            }
405            node.relations.insert(name, saved_children);
406        }
407
408        let command = self
409            .prepare_insert_command(&InsertCommand {
410                entity: node.entity.clone(),
411                values: node.values.clone(),
412            })
413            .map_err(RepositoryError::Runtime)?;
414        let lineage = active_scope.map(|s| s.to_lineage_string());
415        self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
416        node.values = command.values;
417
418        for (name, relation, children) in many_relations {
419            let local_value = node
420                .values
421                .get(&relation.local_key)
422                .cloned()
423                .ok_or_else(|| {
424                    RepositoryError::Runtime(RuntimeError::Graph(format!(
425                        "parent {} missing local key {} for relation {}",
426                        node.entity, relation.local_key, name
427                    )))
428                })?;
429            let mut saved_children = Vec::new();
430            for mut child in children {
431                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
432                if relation.attach {
433                    child
434                        .values
435                        .insert(relation.foreign_key.clone(), local_value.clone());
436                }
437                let child_repo = self.scoped_repository(child.entity.clone());
438                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
439            }
440            node.relations.insert(name, saved_children);
441        }
442
443        Ok(node)
444    }
445
446    fn upsert_graph_node_scoped<'s>(
447        &self,
448        mut node: GraphNode,
449        parent_scope: Option<&'s ScopedCommentNode<'s>>,
450    ) -> Result<GraphNode, RepositoryError<E::Error>> {
451        // Create scope node on the current stack frame if this node has a comment
452        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
453            parent: parent_scope,
454            track: CommentTrack {
455                entity_type: node.entity.clone(),
456                entity_id: node
457                    .id()
458                    .map(|v| match v {
459                        Value::U64(n) => n.to_string(),
460                        Value::I64(n) => n.to_string(),
461                        Value::Text(s) => s.clone(),
462                        other => format!("{other:?}"),
463                    })
464                    .unwrap_or_else(|| "(pending)".into()),
465                comment: c.clone(),
466            },
467        });
468        let active_scope = current_scope.as_ref().or(parent_scope);
469
470        match node.operation {
471            GraphOperation::Upsert | GraphOperation::Create => {}
472            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_lineage_string())),
473            GraphOperation::Remove => {
474                self.validate_remove_node(&node, active_scope.map(|s| s.to_lineage_string()))?;
475                self.delete_graph_node(&node, parent_scope)?;
476                return Ok(node);
477            }
478        }
479
480        let descriptor = self
481            .repository
482            .metadata
483            .context
484            .require_entity(&node.entity)
485            .map_err(RepositoryError::Runtime)?;
486        let Some(id_property) = descriptor.id_property() else {
487            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
488                "entity {} has no id property for graph upsert",
489                node.entity
490            ))));
491        };
492        let Some(id) = node
493            .values
494            .get(&id_property.name)
495            .filter(|value| !is_unassigned_id_value(value))
496            .cloned()
497        else {
498            // Strip comment to prevent duplicate scope — already captured in active_scope
499            node.comment = None;
500            return self.insert_graph_node_scoped(node, active_scope);
501        };
502
503        if node.operation == GraphOperation::Create || self
504            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_lineage_string()))?
505            .is_none()
506        {
507            node.comment = None;
508            return self.insert_graph_node_scoped(node, active_scope);
509        }
510
511        let mut one_relations = Vec::new();
512        let mut many_relations = Vec::new();
513        for (name, children) in std::mem::take(&mut node.relations) {
514            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
515                RepositoryError::Runtime(RuntimeError::MissingRelation {
516                    entity: node.entity.clone(),
517                    relation: name.clone(),
518                })
519            })?;
520            if relation.many {
521                many_relations.push((name, relation.clone(), children));
522            } else {
523                one_relations.push((name, relation.clone(), children));
524            }
525        }
526
527        for (name, relation, children) in one_relations {
528            if children.len() > 1 {
529                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
530                    "relation {}.{} expects one child, got {}",
531                    node.entity,
532                    name,
533                    children.len()
534                ))));
535            }
536            let mut saved_children = Vec::new();
537            for child in children {
538                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
539                let child_repo = self.scoped_repository(child.entity.clone());
540                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
541                if relation.attach {
542                    let foreign_value = saved_child
543                        .values
544                        .get(&relation.foreign_key)
545                        .cloned()
546                        .ok_or_else(|| {
547                            RepositoryError::Runtime(RuntimeError::Graph(format!(
548                                "saved child {} missing foreign key {} for relation {}.{}",
549                                relation.target_entity, relation.foreign_key, node.entity, name
550                            )))
551                        })?;
552                    node.values
553                        .insert(relation.local_key.clone(), foreign_value);
554                }
555                saved_children.push(saved_child);
556            }
557            node.relations.insert(name, saved_children);
558        }
559
560        let update = self.graph_update_command(&node, descriptor, id_property, &id);
561        if !update.values.is_empty() || update.expected_version.is_some() {
562            let prepared_update = self
563                .prepare_update_command(&update)
564                .map_err(RepositoryError::Runtime)?;
565            let lineage = active_scope.map(|s| s.to_lineage_string());
566            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
567            for (field, value) in &prepared_update.values {
568                node.values.insert(field.clone(), value.clone());
569            }
570            if let Some(version_property) = descriptor.version_property() {
571                if let Some(expected_version) = prepared_update.expected_version {
572                    node.values.insert(
573                        version_property.name.clone(),
574                        Value::I64(expected_version + 1),
575                    );
576                }
577            }
578        }
579
580        for (name, relation, children) in many_relations {
581            let local_value = node
582                .values
583                .get(&relation.local_key)
584                .cloned()
585                .ok_or_else(|| {
586                    RepositoryError::Runtime(RuntimeError::Graph(format!(
587                        "parent {} missing local key {} for relation {}",
588                        node.entity, relation.local_key, name
589                    )))
590                })?;
591            let child_repo = self.scoped_repository(relation.target_entity.clone());
592            let existing_children = child_repo.fetch_graph_children(
593                &relation.target_entity,
594                &relation.foreign_key,
595                &local_value,
596                active_scope.map(|s| s.to_lineage_string()),
597            )?;
598            let child_descriptor = self
599                .repository
600                .metadata
601                .context
602                .require_entity(&relation.target_entity)
603                .map_err(RepositoryError::Runtime)?;
604            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
605                RepositoryError::Runtime(RuntimeError::Graph(format!(
606                    "entity {} has no id property for graph diff",
607                    relation.target_entity
608                )))
609            })?;
610            let mut existing_by_id = BTreeMap::new();
611            for child in existing_children {
612                if let Some(id) = child.get(&child_id_property.name) {
613                    existing_by_id.insert(graph_identity_key(id), child);
614                }
615            }
616
617            let mut seen = std::collections::BTreeSet::new();
618            let mut saved_children = Vec::new();
619            for mut child in children {
620                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
621                if relation.attach && child.operation != GraphOperation::Reference {
622                    child
623                        .values
624                        .insert(relation.foreign_key.clone(), local_value.clone());
625                }
626                if let Some(child_id) = child
627                    .values
628                    .get(&child_id_property.name)
629                    .filter(|value| !is_unassigned_id_value(value))
630                {
631                    let key = graph_identity_key(child_id);
632                    if !seen.insert(key.clone()) {
633                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
634                            "duplicate child id {key} in relation {}.{}",
635                            node.entity, name
636                        ))));
637                    }
638                }
639                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
640            }
641
642            if relation.delete_missing {
643                for (id_key, existing) in existing_by_id {
644                    if seen.contains(&id_key) {
645                        continue;
646                    }
647                    let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
648                        continue;
649                    };
650                    let mut delete =
651                        DeleteCommand::new(relation.target_entity.clone(), existing_id);
652                    if let Some(version) = graph_record_version(&existing, child_descriptor) {
653                        delete = delete.expected_version(version);
654                    }
655                    let lineage = active_scope.map(|s| s.to_lineage_string());
656                    child_repo.delete_scoped(&delete, lineage)?;
657                }
658            }
659
660            node.relations.insert(name, saved_children);
661        }
662
663        Ok(node)
664    }
665
666    fn validate_reference_node(
667        &self,
668        node: GraphNode,
669        lineage: Option<String>,
670    ) -> Result<GraphNode, RepositoryError<E::Error>> {
671        if !node.relations.is_empty() {
672            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
673                "reference node {} cannot contain child relations",
674                node.entity
675            ))));
676        }
677        let descriptor = self
678            .repository
679            .metadata
680            .context
681            .require_entity(&node.entity)
682            .map_err(RepositoryError::Runtime)?;
683        let id_property = descriptor.id_property().ok_or_else(|| {
684            RepositoryError::Runtime(RuntimeError::Graph(format!(
685                "entity {} has no id property for graph reference",
686                node.entity
687            )))
688        })?;
689        let id = node
690            .values
691            .get(&id_property.name)
692            .filter(|value| !is_unassigned_id_value(value))
693            .cloned()
694            .ok_or_else(|| {
695                RepositoryError::Runtime(RuntimeError::Graph(format!(
696                    "reference node {} missing id property {}",
697                    node.entity, id_property.name
698                )))
699            })?;
700
701        for field in node.values.keys() {
702            if field == &id_property.name {
703                continue;
704            }
705            if descriptor
706                .version_property()
707                .map(|property| field == &property.name)
708                .unwrap_or(false)
709            {
710                continue;
711            }
712            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
713                "reference node {} cannot carry mutable field {}",
714                node.entity, field
715            ))));
716        }
717
718        let current = self
719            .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
720            .ok_or_else(|| {
721                RepositoryError::Runtime(RuntimeError::Graph(format!(
722                    "reference node {}({}) does not exist",
723                    node.entity,
724                    graph_identity_key(&id)
725                )))
726            })?;
727
728        if let Some(version_property) = descriptor.version_property() {
729            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
730                if *existing_version < 0 {
731                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
732                        "reference node {}({}) is deleted",
733                        node.entity,
734                        graph_identity_key(&id)
735                    ))));
736                }
737                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
738                {
739                    if expected_version != existing_version {
740                        return Err(RepositoryError::Runtime(
741                            RuntimeError::OptimisticLockConflict {
742                                entity: node.entity,
743                                id: graph_identity_key(&id),
744                            },
745                        ));
746                    }
747                }
748            }
749        }
750
751        Ok(GraphNode {
752            entity: node.entity,
753            values: current,
754            relations: BTreeMap::new(),
755            operation: GraphOperation::Reference,
756            comment: None,
757        })
758    }
759
760    fn validate_remove_node(&self, node: &GraphNode, lineage: Option<String>) -> Result<(), RepositoryError<E::Error>> {
761        if !node.relations.is_empty() {
762            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
763                "remove node {} cannot contain child relations",
764                node.entity
765            ))));
766        }
767        let descriptor = self
768            .repository
769            .metadata
770            .context
771            .require_entity(&node.entity)
772            .map_err(RepositoryError::Runtime)?;
773        let id_property = descriptor.id_property().ok_or_else(|| {
774            RepositoryError::Runtime(RuntimeError::Graph(format!(
775                "entity {} has no id property for graph remove",
776                node.entity
777            )))
778        })?;
779        let id = node
780            .values
781            .get(&id_property.name)
782            .filter(|value| !is_unassigned_id_value(value))
783            .cloned()
784            .ok_or_else(|| {
785                RepositoryError::Runtime(RuntimeError::Graph(format!(
786                    "remove node {} missing id property {}",
787                    node.entity, id_property.name
788                )))
789            })?;
790        let current = self
791            .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
792            .ok_or_else(|| {
793                RepositoryError::Runtime(RuntimeError::Graph(format!(
794                    "remove node {}({}) does not exist",
795                    node.entity,
796                    graph_identity_key(&id)
797                )))
798            })?;
799        if let Some(version_property) = descriptor.version_property() {
800            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
801                if *existing_version < 0 {
802                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
803                        "remove node {}({}) is already deleted",
804                        node.entity,
805                        graph_identity_key(&id)
806                    ))));
807                }
808            }
809        }
810        Ok(())
811    }
812
813    fn graph_node_from_record(
814        &self,
815        entity: &str,
816        record: Record,
817    ) -> Result<GraphNode, RuntimeError> {
818        let descriptor = self.repository.metadata.context.require_entity(entity)?;
819        let mut node = GraphNode::new(entity);
820
821        for (field, value) in record {
822            if field == "_comment" {
823                if let Value::Text(comment) = value {
824                    node.set_comment(comment);
825                }
826                continue;
827            }
828            let Some(relation) = descriptor.relation_by_name(&field) else {
829                node.values.insert(field, value);
830                continue;
831            };
832
833            match value {
834                Value::Null => {
835                    node.relations.entry(field).or_default();
836                }
837                Value::Object(record) => {
838                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
839                    node.relations.entry(field).or_default().push(child);
840                }
841                Value::List(values) => {
842                    let children = node.relations.entry(field.clone()).or_default();
843                    for value in values {
844                        let Value::Object(record) = value else {
845                            return Err(RuntimeError::Graph(format!(
846                                "relation {}.{} expects object children, got {:?}",
847                                entity, field, value
848                            )));
849                        };
850                        children
851                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
852                    }
853                }
854                other => {
855                    return Err(RuntimeError::Graph(format!(
856                        "relation {}.{} expects object/list/null, got {:?}",
857                        entity, field, other
858                    )));
859                }
860            }
861        }
862
863        Ok(node)
864    }
865
866    fn graph_update_command(
867        &self,
868        node: &GraphNode,
869        descriptor: &EntityDescriptor,
870        id_property: &PropertyDescriptor,
871        id: &Value,
872    ) -> UpdateCommand {
873        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
874        if let Some(version_property) = descriptor.version_property() {
875            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
876                command = command.expected_version(*version);
877            }
878        }
879        for property in descriptor.properties.iter().filter(|property| {
880            !property.is_id && !property.is_version && node.values.contains_key(&property.name)
881        }) {
882            if property.name == id_property.name {
883                continue;
884            }
885            if let Some(value) = node.values.get(&property.name) {
886                command.values.insert(property.name.clone(), value.clone());
887            }
888        }
889        command
890    }
891
892    fn delete_graph_node<'s>(
893        &self,
894        node: &GraphNode,
895        parent_scope: Option<&'s ScopedCommentNode<'s>>,
896    ) -> Result<u64, RepositoryError<E::Error>> {
897        let descriptor = self
898            .repository
899            .metadata
900            .context
901            .require_entity(&node.entity)
902            .map_err(RepositoryError::Runtime)?;
903        let id_property = descriptor.id_property().ok_or_else(|| {
904            RepositoryError::Runtime(RuntimeError::Graph(format!(
905                "entity {} has no id property for graph remove",
906                node.entity
907            )))
908        })?;
909        let id = node
910            .values
911            .get(&id_property.name)
912            .filter(|value| !is_unassigned_id_value(value))
913            .cloned()
914            .ok_or_else(|| {
915                RepositoryError::Runtime(RuntimeError::Graph(format!(
916                    "remove node {} missing id property {}",
917                    node.entity, id_property.name
918                )))
919            })?;
920        let mut delete = DeleteCommand::new(node.entity.clone(), id);
921        if let Some(version_property) = descriptor.version_property() {
922            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
923                delete = delete.expected_version(*version);
924            }
925        }
926
927        // Create scope node for deletion if parent/node comment is present
928        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
929            parent: parent_scope,
930            track: CommentTrack {
931                entity_type: node.entity.clone(),
932                entity_id: node
933                    .id()
934                    .map(|v| match v {
935                        Value::U64(n) => n.to_string(),
936                        Value::I64(n) => n.to_string(),
937                        Value::Text(s) => s.clone(),
938                        other => format!("{other:?}"),
939                    })
940                    .unwrap_or_else(|| "(pending)".into()),
941                comment: c.clone(),
942            },
943        });
944        let active_scope = current_scope.as_ref().or(parent_scope);
945        let lineage = active_scope.map(|s| s.to_lineage_string());
946
947        self.delete_scoped(&delete, lineage)
948    }
949
950    fn fetch_graph_current_row(
951        &self,
952        entity: &str,
953        id_property: &str,
954        id: &Value,
955        lineage: Option<String>,
956    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
957        let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
958        if let Some(lineage) = lineage {
959            query = query.comment(lineage);
960        }
961        let mut rows = self
962            .scoped_repository(entity.to_owned())
963            .fetch_all(&query)?;
964        Ok(rows.pop())
965    }
966
967    fn fetch_graph_children(
968        &self,
969        entity: &str,
970        foreign_key: &str,
971        parent_value: &Value,
972        lineage: Option<String>,
973    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
974        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
975        if let Some(lineage) = lineage {
976            query = query.comment(lineage);
977        }
978        self.scoped_repository(entity.to_owned()).fetch_all(&query)
979    }
980}