Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6    SelectQuery, UpdateCommand, Value,
7};
8use teaql_sql::SqlDialect;
9
10use crate::{
11    GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
12    RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
13};
14use crate::entity_status::EntityStatus;
15
16use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
17
18impl<'a, D, E> ResolvedRepository<'a, D, E>
19where
20    D: SqlDialect,
21    E: QueryExecutor,
22{
23    pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
24        if node.entity != self.entity {
25            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
26                "resolved repository {} cannot save graph root {}",
27                self.entity, node.entity
28            ))));
29        }
30        let boundary = self
31            .repository
32            .executor
33            .begin_transaction()
34            .map_err(RepositoryError::Executor)?;
35        if matches!(boundary, GraphTransactionBoundary::Unsupported) {
36            return Err(RepositoryError::Runtime(RuntimeError::Graph(
37                "save_graph requires a transactional executor".to_owned(),
38            )));
39        }
40        let result = self.upsert_graph_node_scoped(node, None);
41        match result {
42            Ok(saved) => {
43                if matches!(boundary, GraphTransactionBoundary::Started) {
44                    self.repository
45                        .executor
46                        .commit_transaction()
47                        .map_err(RepositoryError::Executor)?;
48                }
49                Ok(saved)
50            }
51            Err(err) => {
52                if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
53                    self.repository
54                        .executor
55                        .rollback_transaction()
56                        .map_err(RepositoryError::Executor)?;
57                }
58                Err(err)
59            }
60        }
61    }
62
63    pub fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
64        fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
65            let mut relations = BTreeMap::new();
66            for (rel_name, child) in node.children {
67                relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
68            }
69            GraphNode {
70                entity: node.entity_type,
71                values: node.record,
72                relations,
73                operation: match node.operation {
74                    teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
75                    teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
76                },
77                comment: node.comment,
78                dirty_fields: None, original_values: None,
79            }
80        }
81        self.save_graph(convert(graph.root))
82    }
83
84    pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
85    where
86        T: Entity,
87    {
88        let node = self
89            .graph_node_from_entity(entity)
90            .map_err(RepositoryError::Runtime)?;
91        self.save_graph(node)
92    }
93
94    pub fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
95    where
96        T: Entity,
97    {
98        if !status.need_persist() {
99            return Ok(GraphNode::new(&self.entity));
100        }
101        if status.is_deleted() {
102            let mut node = self.graph_node_from_entity(entity)
103                .map_err(RepositoryError::Runtime)?;
104            node.operation = GraphOperation::Remove;
105            node.relations.clear();
106            self.save_graph(node)
107        } else {
108            self.save_entity_graph(entity)
109        }
110    }
111    pub fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
112    where
113        T: Entity,
114    {
115        if status.is_deleted() {
116            let mut node = self.graph_node_from_entity(entity)
117                .map_err(RepositoryError::Runtime)?;
118            node.operation = GraphOperation::Remove;
119            node.relations.clear();
120            node.set_comment(comment);
121            self.save_graph(node)
122        } else {
123            self.save_entity_graph_with_comment(entity, comment)
124        }
125    }
126    pub fn save_entity_graph_with_comment<T>(
127        &self,
128        entity: T,
129        comment: impl Into<String>,
130    ) -> Result<GraphNode, RepositoryError<E::Error>>
131    where
132        T: Entity,
133    {
134        let mut node = self
135            .graph_node_from_entity(entity)
136            .map_err(RepositoryError::Runtime)?;
137        node.set_comment(comment);
138        self.save_graph(node)
139    }
140
141    /// Create a new entity graph with an annotation comment on the root node.
142    /// This assumes all new nodes do not exist in the database, skipping existence checks
143    /// and throwing an exception on primary key conflict.
144    pub fn create_entity_graph_with_comment<T>(
145        &self,
146        entity: T,
147        comment: impl Into<String>,
148    ) -> Result<GraphNode, RepositoryError<E::Error>>
149    where
150        T: Entity,
151    {
152        let mut node = self
153            .graph_node_from_entity(entity)
154            .map_err(RepositoryError::Runtime)?;
155        node.operation = GraphOperation::Create;
156        node.set_comment(comment);
157        self.save_graph(node)
158    }
159
160    pub fn plan_graph(
161        &self,
162        node: GraphNode,
163    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
164        if node.entity != self.entity {
165            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
166                "resolved repository {} cannot plan graph root {}",
167                self.entity, node.entity
168            ))));
169        }
170        let mut node = node;
171        let mut plan = GraphMutationPlan::default();
172        self.collect_graph_plan(&mut node, &mut plan, None, None, false)?;
173        plan.planned_root = Some(node);
174        plan.rebuild_batches();
175        Ok(plan)
176    }
177
178    pub fn execute_graph_plan(
179        &self,
180        plan: GraphMutationPlan,
181    ) -> Result<GraphNode, RepositoryError<E::Error>> {
182        let Some(root) = plan.planned_root else {
183            return Err(RepositoryError::Runtime(RuntimeError::Graph(
184                "graph mutation plan has no planned root".to_owned(),
185            )));
186        };
187
188        for batch in plan.batches {
189            if batch.items.is_empty() {
190                continue;
191            }
192            match batch.kind {
193                GraphMutationKind::Create => {
194                    let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
195                    for item in batch.items {
196                        cmd.batch_values.push(item.values);
197                        if let Some(token) = item.scope_token {
198                            cmd.trace_chains.push(token.recover_trace_chain());
199                        } else {
200                            cmd.trace_chains.push(Vec::new());
201                        }
202                    }
203                    self.execute_prepared_batch_insert(cmd)?;
204                }
205                GraphMutationKind::Update => {
206                    let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
207                    for item in batch.items {
208                        let id = item.values.get("id").cloned().ok_or_else(|| {
209                            RepositoryError::Runtime(RuntimeError::Graph(format!(
210                                "update item in batch missing id for {}", batch.entity
211                            )))
212                        })?;
213                        let version = item.values.get("version").and_then(|v| {
214                            if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
215                        });
216                        cmd.batch_values.push(item.values);
217                        cmd.batch_ids.push(id);
218                        cmd.batch_expected_versions.push(version);
219                        cmd.batch_old_values.push(item.old_values);
220                        if let Some(token) = item.scope_token {
221                            cmd.trace_chains.push(token.recover_trace_chain());
222                        } else {
223                            cmd.trace_chains.push(Vec::new());
224                        }
225                    }
226                    self.execute_prepared_batch_update(cmd)?;
227                }
228                GraphMutationKind::Delete => {
229                    // For now, loop individually since we lack BatchDeleteCommand
230                    for item in batch.items {
231                        let id = item.values.get("id").cloned().ok_or_else(|| {
232                            RepositoryError::Runtime(RuntimeError::Graph(format!(
233                                "delete item in batch missing id for {}", batch.entity
234                            )))
235                        })?;
236                        let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
237                        if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
238                            cmd = cmd.expected_version(*version);
239                        }
240                        let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
241                        self.delete_scoped(&cmd, trace_chain)?;
242                    }
243                }
244                GraphMutationKind::Reference => {
245                    // References are skipped in execution, they only validate during traversal
246                }
247            }
248        }
249
250        Ok(root)
251    }
252
253    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
254    where
255        T: Entity,
256    {
257        let descriptor = T::entity_descriptor();
258        if descriptor.name != self.entity {
259            return Err(RuntimeError::Graph(format!(
260                "resolved repository {} cannot extract graph root {}",
261                self.entity, descriptor.name
262            )));
263        }
264        // Extract dirty field names BEFORE into_record() consumes the entity.
265        // This is the Rust equivalent of Java's entity.getUpdatedProperties().
266        let dirty_fields = entity.dirty_fields();
267        let original_values = entity.original_values();
268        let is_deleted = entity.is_marked_as_delete();
269        let comment = entity.comment();
270        let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
271        node.dirty_fields = dirty_fields;
272        node.original_values = original_values;
273        if is_deleted {
274            node.operation = GraphOperation::Remove;
275            node.relations.clear();
276        }
277        if let Some(c) = comment {
278            node.set_comment(c);
279        }
280        Ok(node)
281    }
282
283    fn collect_graph_plan<'s>(
284        &self,
285        node: &mut GraphNode,
286        plan: &mut GraphMutationPlan,
287        parent_scope: Option<&'s ScopedCommentNode<'s>>,
288        parent_token: Option<Arc<TraceScopeToken>>,
289        parent_is_create: bool,
290    ) -> Result<(), RepositoryError<E::Error>> {
291        match node.operation {
292            GraphOperation::Reference => {
293                plan.push(
294                    node.entity.clone(),
295                    GraphMutationKind::Reference,
296                    node.values.clone(),
297                    Vec::new(),
298                    parent_token,
299                    node.original_values.clone(),
300                );
301                return Ok(());
302            }
303            GraphOperation::Remove => {
304                plan.push(
305                    node.entity.clone(),
306                    GraphMutationKind::Delete,
307                    node.values.clone(),
308                    Vec::new(),
309                    parent_token,
310                    node.original_values.clone(),
311                );
312                return Ok(());
313            }
314            GraphOperation::Upsert | GraphOperation::Create => {}
315        }
316
317        let descriptor = self
318            .repository
319            .metadata
320            .context
321            .require_entity(&node.entity)
322            .map_err(RepositoryError::Runtime)?;
323
324        // Create scope node on the current stack frame if this node has a comment
325        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
326            parent: parent_scope,
327            track: teaql_core::TraceNode {
328                entity_type: node.entity.clone(),
329                entity_id: node.id().and_then(|v| match v {
330                    Value::U64(n) => Some(*n),
331                    Value::I64(n) => Some(*n as u64),
332                    _ => None,
333                }),
334                comment: c.clone(),
335            },
336        });
337        let active_scope = current_scope.as_ref().or(parent_scope);
338
339        let id_property = descriptor.id_property().cloned();
340        let id = id_property.as_ref().and_then(|property| {
341            node.values
342                .get(&property.name)
343                .filter(|value| !is_unassigned_id_value(value))
344                .cloned()
345        });
346
347        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
348
349        let is_update = if is_create_op {
350            false
351        } else {
352            match (id_property.as_ref(), id.as_ref()) {
353                (Some(id_property), Some(id)) => self
354                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?
355                    .is_some(),
356                _ => false,
357            }
358        };
359        if !is_update {
360            if let Some(id_property) = id_property.as_ref() {
361                let needs_id = !node.values.contains_key(&id_property.name)
362                    || node
363                        .values
364                        .get(&id_property.name)
365                        .is_some_and(is_unassigned_id_value);
366                if needs_id {
367                    let id = self
368                        .repository
369                        .metadata
370                        .context
371                        .next_id(&node.entity)
372                        .map_err(RepositoryError::Runtime)?;
373                    node.values.insert(id_property.name.clone(), Value::U64(id));
374                }
375            }
376            ensure_initial_version(&mut node.values, descriptor);
377        }
378        let update_fields = if is_update {
379            let mut excluded = Vec::new();
380            if let Some(id_property) = id_property.as_ref() {
381                excluded.push(id_property.name.clone());
382            }
383            if let Some(version_property) = descriptor.version_property() {
384                excluded.push(version_property.name.clone());
385            }
386            sorted_update_fields(&node.values, excluded)
387        } else {
388            Vec::new()
389        };
390
391        // Build the TraceScopeToken for this node (only if it has a comment).
392        // This is an Arc-linked persistent list: zero-copy, O(1) creation.
393        let current_token = if let Some(c) = &node.comment {
394            Some(Arc::new(TraceScopeToken {
395                parent: parent_token.clone(),
396                track: teaql_core::TraceNode {
397                    entity_type: node.entity.clone(),
398                    entity_id: node.id().and_then(|v| match v {
399                        Value::U64(n) => Some(*n),
400                        Value::I64(n) => Some(*n as u64),
401                        _ => None,
402                    }),
403                    comment: c.clone(),
404                },
405                node_index: plan.next_item_index,
406            }))
407        } else {
408            parent_token.clone()
409        };
410
411        plan.push(
412            node.entity.clone(),
413            if is_update {
414                GraphMutationKind::Update
415            } else {
416                GraphMutationKind::Create
417            },
418            node.values.clone(),
419            update_fields,
420            current_token.clone(),
421            node.original_values.clone(),
422        );
423
424        for (name, children) in &mut node.relations {
425            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
426                RepositoryError::Runtime(RuntimeError::MissingRelation {
427                    entity: node.entity.clone(),
428                    relation: name.clone(),
429                })
430            })?;
431            let child_repo = self.scoped_repository(relation.target_entity.clone());
432            for child in children {
433                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
434                child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op)?;
435            }
436        }
437        Ok(())
438    }
439
440    fn insert_graph_node_scoped<'s>(
441        &self,
442        mut node: GraphNode,
443        parent_scope: Option<&'s ScopedCommentNode<'s>>,
444    ) -> Result<GraphNode, RepositoryError<E::Error>> {
445        match node.operation {
446            GraphOperation::Upsert | GraphOperation::Create => {}
447            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()),
448            GraphOperation::Remove => {
449                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
450                    "create graph cannot remove node {}",
451                    node.entity
452                ))));
453            }
454        }
455
456        // Create scope node on the current stack frame if this node has a comment
457        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
458            parent: parent_scope,
459            track: teaql_core::TraceNode {
460                entity_type: node.entity.clone(),
461                entity_id: node
462                    .id()
463                    .and_then(|v| match v {
464                        Value::U64(n) => Some(*n),
465                        Value::I64(n) => Some(*n as u64),
466                        _ => None,
467                    }),
468                comment: c.clone(),
469            },
470        });
471        let active_scope = current_scope.as_ref().or(parent_scope);
472
473        let descriptor = self
474            .repository
475            .metadata
476            .context
477            .require_entity(&node.entity)
478            .map_err(RepositoryError::Runtime)?;
479
480        let mut one_relations = Vec::new();
481        let mut many_relations = Vec::new();
482        for (name, children) in std::mem::take(&mut node.relations) {
483            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
484                RepositoryError::Runtime(RuntimeError::MissingRelation {
485                    entity: node.entity.clone(),
486                    relation: name.clone(),
487                })
488            })?;
489            if relation.many {
490                many_relations.push((name, relation.clone(), children));
491            } else {
492                one_relations.push((name, relation.clone(), children));
493            }
494        }
495
496        for (name, relation, children) in one_relations {
497            if children.len() > 1 {
498                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
499                    "relation {}.{} expects one child, got {}",
500                    node.entity,
501                    name,
502                    children.len()
503                ))));
504            }
505            let mut saved_children = Vec::new();
506            for child in children {
507                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
508                let child_repo = self.scoped_repository(child.entity.clone());
509                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
510                if relation.attach {
511                    let foreign_value = saved_child
512                        .values
513                        .get(&relation.foreign_key)
514                        .cloned()
515                        .ok_or_else(|| {
516                            RepositoryError::Runtime(RuntimeError::Graph(format!(
517                                "saved child {} missing foreign key {} for relation {}.{}",
518                                relation.target_entity, relation.foreign_key, node.entity, name
519                            )))
520                        })?;
521                    node.values
522                        .insert(relation.local_key.clone(), foreign_value);
523                }
524                saved_children.push(saved_child);
525            }
526            node.relations.insert(name, saved_children);
527        }
528
529        let command = self
530            .prepare_insert_command(&InsertCommand {
531                entity: node.entity.clone(),
532                values: node.values.clone(),
533                trace_chain: Vec::new(),
534            })
535            .map_err(RepositoryError::Runtime)?;
536        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
537        self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
538        node.values = command.values;
539
540        for (name, relation, children) in many_relations {
541            let local_value = node
542                .values
543                .get(&relation.local_key)
544                .cloned()
545                .ok_or_else(|| {
546                    RepositoryError::Runtime(RuntimeError::Graph(format!(
547                        "parent {} missing local key {} for relation {}",
548                        node.entity, relation.local_key, name
549                    )))
550                })?;
551            let mut saved_children = Vec::new();
552            for mut child in children {
553                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
554                if relation.attach {
555                    child
556                        .values
557                        .insert(relation.foreign_key.clone(), local_value.clone());
558                }
559                let child_repo = self.scoped_repository(child.entity.clone());
560                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
561            }
562            node.relations.insert(name, saved_children);
563        }
564
565        Ok(node)
566    }
567
568    fn upsert_graph_node_scoped<'s>(
569        &self,
570        mut node: GraphNode,
571        parent_scope: Option<&'s ScopedCommentNode<'s>>,
572    ) -> Result<GraphNode, RepositoryError<E::Error>> {
573        // Create scope node on the current stack frame if this node has a comment
574        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
575            parent: parent_scope,
576            track: teaql_core::TraceNode {
577                entity_type: node.entity.clone(),
578                entity_id: node
579                    .id()
580                    .and_then(|v| match v {
581                        Value::U64(n) => Some(*n),
582                        Value::I64(n) => Some(*n as u64),
583                        _ => None,
584                    }),
585                comment: c.clone(),
586            },
587        });
588        let active_scope = current_scope.as_ref().or(parent_scope);
589
590        match node.operation {
591            GraphOperation::Upsert | GraphOperation::Create => {}
592            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()),
593            GraphOperation::Remove => {
594                self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?;
595                self.delete_graph_node(&node, parent_scope)?;
596                return Ok(node);
597            }
598        }
599
600        let descriptor = self
601            .repository
602            .metadata
603            .context
604            .require_entity(&node.entity)
605            .map_err(RepositoryError::Runtime)?;
606        let Some(id_property) = descriptor.id_property() else {
607            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
608                "entity {} has no id property for graph upsert",
609                node.entity
610            ))));
611        };
612        let Some(id) = node
613            .values
614            .get(&id_property.name)
615            .filter(|value| !is_unassigned_id_value(value))
616            .cloned()
617        else {
618            // Strip comment to prevent duplicate scope — already captured in active_scope
619            node.comment = None;
620            return self.insert_graph_node_scoped(node, active_scope);
621        };
622
623        if node.operation == GraphOperation::Create || self
624            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?
625            .is_none()
626        {
627            node.comment = None;
628            return self.insert_graph_node_scoped(node, active_scope);
629        }
630
631        let mut one_relations = Vec::new();
632        let mut many_relations = Vec::new();
633        for (name, children) in std::mem::take(&mut node.relations) {
634            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
635                RepositoryError::Runtime(RuntimeError::MissingRelation {
636                    entity: node.entity.clone(),
637                    relation: name.clone(),
638                })
639            })?;
640            if relation.many {
641                many_relations.push((name, relation.clone(), children));
642            } else {
643                one_relations.push((name, relation.clone(), children));
644            }
645        }
646
647        for (name, relation, children) in one_relations {
648            if children.len() > 1 {
649                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
650                    "relation {}.{} expects one child, got {}",
651                    node.entity,
652                    name,
653                    children.len()
654                ))));
655            }
656            let mut saved_children = Vec::new();
657            for child in children {
658                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
659                let child_repo = self.scoped_repository(child.entity.clone());
660                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
661                if relation.attach {
662                    let foreign_value = saved_child
663                        .values
664                        .get(&relation.foreign_key)
665                        .cloned()
666                        .ok_or_else(|| {
667                            RepositoryError::Runtime(RuntimeError::Graph(format!(
668                                "saved child {} missing foreign key {} for relation {}.{}",
669                                relation.target_entity, relation.foreign_key, node.entity, name
670                            )))
671                        })?;
672                    node.values
673                        .insert(relation.local_key.clone(), foreign_value);
674                }
675                saved_children.push(saved_child);
676            }
677            node.relations.insert(name, saved_children);
678        }
679
680        let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
681        if !update.values.is_empty() || update.expected_version.is_some() {
682            let prepared_update = self
683                .prepare_update_command(&update)
684                .map_err(RepositoryError::Runtime)?;
685            let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
686            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
687            for (field, value) in &prepared_update.values {
688                node.values.insert(field.clone(), value.clone());
689            }
690            if let Some(version_property) = descriptor.version_property() {
691                if let Some(expected_version) = prepared_update.expected_version {
692                    node.values.insert(
693                        version_property.name.clone(),
694                        Value::I64(expected_version + 1),
695                    );
696                }
697            }
698        }
699
700        for (name, relation, children) in many_relations {
701            let local_value = node
702                .values
703                .get(&relation.local_key)
704                .cloned()
705                .ok_or_else(|| {
706                    RepositoryError::Runtime(RuntimeError::Graph(format!(
707                        "parent {} missing local key {} for relation {}",
708                        node.entity, relation.local_key, name
709                    )))
710                })?;
711            let child_repo = self.scoped_repository(relation.target_entity.clone());
712            let child_descriptor = self
713                .repository
714                .metadata
715                .context
716                .require_entity(&relation.target_entity)
717                .map_err(RepositoryError::Runtime)?;
718            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
719                RepositoryError::Runtime(RuntimeError::Graph(format!(
720                    "entity {} has no id property",
721                    relation.target_entity
722                )))
723            })?;
724
725            let mut seen = std::collections::BTreeSet::new();
726            let mut saved_children = Vec::new();
727            for mut child in children {
728                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
729                if relation.attach && child.operation != GraphOperation::Reference {
730                    child
731                        .values
732                        .insert(relation.foreign_key.clone(), local_value.clone());
733                }
734                if let Some(child_id) = child
735                    .values
736                    .get(&child_id_property.name)
737                    .filter(|value| !is_unassigned_id_value(value))
738                {
739                    let key = graph_identity_key(child_id);
740                    if !seen.insert(key.clone()) {
741                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
742                            "duplicate child id {key} in relation {}.{}",
743                            node.entity, name
744                        ))));
745                    }
746                }
747                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
748            }
749
750
751
752            node.relations.insert(name, saved_children);
753        }
754
755        Ok(node)
756    }
757
758    fn validate_reference_node(
759        &self,
760        node: GraphNode,
761        trace_chain: Vec<teaql_core::TraceNode>,
762    ) -> Result<GraphNode, RepositoryError<E::Error>> {
763        if !node.relations.is_empty() {
764            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
765                "reference node {} cannot contain child relations",
766                node.entity
767            ))));
768        }
769        let descriptor = self
770            .repository
771            .metadata
772            .context
773            .require_entity(&node.entity)
774            .map_err(RepositoryError::Runtime)?;
775        let id_property = descriptor.id_property().ok_or_else(|| {
776            RepositoryError::Runtime(RuntimeError::Graph(format!(
777                "entity {} has no id property for graph reference",
778                node.entity
779            )))
780        })?;
781        let id = node
782            .values
783            .get(&id_property.name)
784            .filter(|value| !is_unassigned_id_value(value))
785            .cloned()
786            .ok_or_else(|| {
787                RepositoryError::Runtime(RuntimeError::Graph(format!(
788                    "reference node {} missing id property {}",
789                    node.entity, id_property.name
790                )))
791            })?;
792
793        for field in node.values.keys() {
794            if field == &id_property.name {
795                continue;
796            }
797            if descriptor
798                .version_property()
799                .map(|property| field == &property.name)
800                .unwrap_or(false)
801            {
802                continue;
803            }
804            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
805                "reference node {} cannot carry mutable field {}",
806                node.entity, field
807            ))));
808        }
809
810        let current = self
811            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain)?
812            .ok_or_else(|| {
813                RepositoryError::Runtime(RuntimeError::Graph(format!(
814                    "reference node {}({}) does not exist",
815                    node.entity,
816                    graph_identity_key(&id)
817                )))
818            })?;
819
820        if let Some(version_property) = descriptor.version_property() {
821            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
822                if *existing_version < 0 {
823                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
824                        "reference node {}({}) is deleted",
825                        node.entity,
826                        graph_identity_key(&id)
827                    ))));
828                }
829                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
830                {
831                    if expected_version != existing_version {
832                        return Err(RepositoryError::Runtime(
833                            RuntimeError::OptimisticLockConflict {
834                                entity: node.entity,
835                                id: graph_identity_key(&id),
836                            },
837                        ));
838                    }
839                }
840            }
841        }
842
843        Ok(GraphNode {
844            entity: node.entity,
845            values: current,
846            relations: BTreeMap::new(),
847            operation: GraphOperation::Reference,
848            comment: None,
849            dirty_fields: None, original_values: None,
850        })
851    }
852
853    fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
854        if !node.relations.is_empty() {
855            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
856                "remove node {} cannot contain child relations",
857                node.entity
858            ))));
859        }
860        let descriptor = self
861            .repository
862            .metadata
863            .context
864            .require_entity(&node.entity)
865            .map_err(RepositoryError::Runtime)?;
866        let id_property = descriptor.id_property().ok_or_else(|| {
867            RepositoryError::Runtime(RuntimeError::Graph(format!(
868                "entity {} has no id property for graph remove",
869                node.entity
870            )))
871        })?;
872        let id = node
873            .values
874            .get(&id_property.name)
875            .filter(|value| !is_unassigned_id_value(value))
876            .cloned()
877            .ok_or_else(|| {
878                RepositoryError::Runtime(RuntimeError::Graph(format!(
879                    "remove node {} missing id property {}",
880                    node.entity, id_property.name
881                )))
882            })?;
883        let current = self
884            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain)?
885            .ok_or_else(|| {
886                RepositoryError::Runtime(RuntimeError::Graph(format!(
887                    "remove node {}({}) does not exist",
888                    node.entity,
889                    graph_identity_key(&id)
890                )))
891            })?;
892        if let Some(version_property) = descriptor.version_property() {
893            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
894                if *existing_version < 0 {
895                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
896                        "remove node {}({}) is already deleted",
897                        node.entity,
898                        graph_identity_key(&id)
899                    ))));
900                }
901            }
902        }
903        Ok(())
904    }
905
906    fn graph_node_from_record(
907        &self,
908        entity: &str,
909        record: Record,
910    ) -> Result<GraphNode, RuntimeError> {
911        let descriptor = self.repository.metadata.context.require_entity(entity)?;
912        let mut node = GraphNode::new(entity);
913
914        for (field, value) in record {
915            if field == "_comment" {
916                if let Value::Text(comment) = value {
917                    node.set_comment(comment);
918                }
919                continue;
920            }
921            let Some(relation) = descriptor.relation_by_name(&field) else {
922                node.values.insert(field, value);
923                continue;
924            };
925
926            match value {
927                Value::Null => {
928                    node.relations.entry(field).or_default();
929                }
930                Value::Object(record) => {
931                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
932                    node.relations.entry(field).or_default().push(child);
933                }
934                Value::List(values) => {
935                    let children = node.relations.entry(field.clone()).or_default();
936                    for value in values {
937                        let Value::Object(record) = value else {
938                            return Err(RuntimeError::Graph(format!(
939                                "relation {}.{} expects object children, got {:?}",
940                                entity, field, value
941                            )));
942                        };
943                        children
944                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
945                    }
946                }
947                other => {
948                    return Err(RuntimeError::Graph(format!(
949                        "relation {}.{} expects object/list/null, got {:?}",
950                        entity, field, other
951                    )));
952                }
953            }
954        }
955
956        Ok(node)
957    }
958
959    fn graph_update_command(
960        &self,
961        node: &mut GraphNode,
962        descriptor: &EntityDescriptor,
963        id_property: &PropertyDescriptor,
964        id: &Value,
965    ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
966        crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
967        let check_result = self
968            .repository
969            .metadata
970            .context
971            .check_and_fix_record(&node.entity, &mut node.values);
972        crate::clear_record_status(&mut node.values);
973        check_result.map_err(RepositoryError::Runtime)?;
974
975        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
976        command.old_values = node.original_values.clone();
977        if let Some(version_property) = descriptor.version_property() {
978            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
979                command = command.expected_version(*version);
980            }
981        }
982        // Filter properties by dirty_fields when available (Java-style minimal UPDATE).
983        // When dirty_fields is Some, only modified fields are included in the SET clause.
984        // When dirty_fields is None (no tracking), fall back to all fields in node.values.
985        for property in descriptor.properties.iter().filter(|property| {
986            !property.is_id
987                && !property.is_version
988                && property.name != id_property.name
989                && match &node.dirty_fields {
990                    Some(dirty) => dirty.contains(&property.name),
991                    None => node.values.contains_key(&property.name),
992                }
993        }) {
994            if let Some(value) = node.values.get(&property.name) {
995                command.values.insert(property.name.clone(), value.clone());
996            }
997        }
998        Ok(command)
999    }
1000
1001    fn delete_graph_node<'s>(
1002        &self,
1003        node: &GraphNode,
1004        parent_scope: Option<&'s ScopedCommentNode<'s>>,
1005    ) -> Result<u64, RepositoryError<E::Error>> {
1006        let descriptor = self
1007            .repository
1008            .metadata
1009            .context
1010            .require_entity(&node.entity)
1011            .map_err(RepositoryError::Runtime)?;
1012        let id_property = descriptor.id_property().ok_or_else(|| {
1013            RepositoryError::Runtime(RuntimeError::Graph(format!(
1014                "entity {} has no id property for graph remove",
1015                node.entity
1016            )))
1017        })?;
1018        let id = node
1019            .values
1020            .get(&id_property.name)
1021            .filter(|value| !is_unassigned_id_value(value))
1022            .cloned()
1023            .ok_or_else(|| {
1024                RepositoryError::Runtime(RuntimeError::Graph(format!(
1025                    "remove node {} missing id property {}",
1026                    node.entity, id_property.name
1027                )))
1028            })?;
1029        let mut delete = DeleteCommand::new(node.entity.clone(), id);
1030        if let Some(version_property) = descriptor.version_property() {
1031            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1032                delete = delete.expected_version(*version);
1033            }
1034        }
1035
1036        // Create scope node for deletion if parent/node comment is present
1037        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1038            parent: parent_scope,
1039            track: teaql_core::TraceNode {
1040                entity_type: node.entity.clone(),
1041                entity_id: node
1042                    .id()
1043                    .and_then(|v| match v {
1044                        Value::U64(n) => Some(*n),
1045                        Value::I64(n) => Some(*n as u64),
1046                        _ => None,
1047                    }),
1048                comment: c.clone(),
1049            },
1050        });
1051        let active_scope = current_scope.as_ref().or(parent_scope);
1052        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1053
1054        self.delete_scoped(&delete, lineage)
1055    }
1056
1057    pub fn fetch_graph_current_row(
1058        &self,
1059        entity: &str,
1060        id_property: &str,
1061        id: &Value,
1062        trace_chain: Vec<teaql_core::TraceNode>,
1063    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1064        let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1065        query.trace_chain = trace_chain;
1066        let mut rows = self
1067            .scoped_repository(entity.to_owned())
1068            .fetch_all(&query)?;
1069        Ok(rows.pop())
1070    }
1071
1072    fn fetch_graph_children(
1073        &self,
1074        entity: &str,
1075        foreign_key: &str,
1076        parent_value: &Value,
1077        trace_chain: Vec<teaql_core::TraceNode>,
1078    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1079        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1080        query.trace_chain = trace_chain;
1081        self.scoped_repository(entity.to_owned()).fetch_all(&query)
1082    }
1083}