Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6    SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10    GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11    RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21    pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22        if node.entity != self.entity {
23            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24                "resolved repository {} cannot save graph root {}",
25                self.entity, node.entity
26            ))));
27        }
28        self.upsert_graph_node_scoped(node, None).await
29    }
30
31    pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
32        fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
33            let mut relations = BTreeMap::new();
34            for (rel_name, child) in node.children {
35                relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
36            }
37            GraphNode {
38                entity: node.entity_type,
39                values: node.record,
40                relations,
41                operation: match node.operation {
42                    teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
43                    teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
44                },
45                comment: node.comment,
46                dirty_fields: None, original_values: None,
47            }
48        }
49        self.save_graph(convert(graph.root)).await
50    }
51
52    pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
53    where
54        T: Entity,
55    {
56        let node = self
57            .graph_node_from_entity(entity)
58            .map_err(RepositoryError::Runtime)?;
59        self.save_graph(node).await
60    }
61
62    pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
63    where
64        T: Entity,
65    {
66        if !status.need_persist() {
67            return Ok(GraphNode::new(&self.entity));
68        }
69        if status.is_deleted() {
70            let mut node = self.graph_node_from_entity(entity)
71                .map_err(RepositoryError::Runtime)?;
72            node.operation = GraphOperation::Remove;
73            node.relations.clear();
74            self.save_graph(node).await
75        } else {
76            self.save_entity_graph(entity).await
77        }
78    }
79    pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
80    where
81        T: Entity,
82    {
83        if status.is_deleted() {
84            let mut node = self.graph_node_from_entity(entity)
85                .map_err(RepositoryError::Runtime)?;
86            node.operation = GraphOperation::Remove;
87            node.relations.clear();
88            node.set_comment(comment);
89            self.save_graph(node).await
90        } else {
91            self.save_entity_graph_with_comment(entity, comment).await
92        }
93    }
94    pub async fn save_entity_graph_with_comment<T>(
95        &self,
96        entity: T,
97        comment: impl Into<String>,
98    ) -> Result<GraphNode, RepositoryError<E::Error>>
99    where
100        T: Entity,
101    {
102        let mut node = self
103            .graph_node_from_entity(entity)
104            .map_err(RepositoryError::Runtime)?;
105        node.set_comment(comment);
106        self.save_graph(node).await
107    }
108
109    /// Create a new entity graph with an annotation comment on the root node.
110    /// This assumes all new nodes do not exist in the database, skipping existence checks
111    /// and throwing an exception on primary key conflict.
112    pub async fn create_entity_graph_with_comment<T>(
113        &self,
114        entity: T,
115        comment: impl Into<String>,
116    ) -> Result<GraphNode, RepositoryError<E::Error>>
117    where
118        T: Entity,
119    {
120        let mut node = self
121            .graph_node_from_entity(entity)
122            .map_err(RepositoryError::Runtime)?;
123        node.operation = GraphOperation::Create;
124        node.set_comment(comment);
125        self.save_graph(node).await
126    }
127
128    pub async fn plan_graph(
129        &self,
130        node: GraphNode,
131    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
132        if node.entity != self.entity {
133            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
134                "resolved repository {} cannot plan graph root {}",
135                self.entity, node.entity
136            ))));
137        }
138        let mut node = node;
139        let mut plan = GraphMutationPlan::default();
140        self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
141        plan.planned_root = Some(node);
142        plan.rebuild_batches();
143        Ok(plan)
144    }
145
146    pub async fn execute_graph_plan(
147        &self,
148        plan: GraphMutationPlan,
149    ) -> Result<GraphNode, RepositoryError<E::Error>> {
150        let Some(root) = plan.planned_root else {
151            return Err(RepositoryError::Runtime(RuntimeError::Graph(
152                "graph mutation plan has no planned root".to_owned(),
153            )));
154        };
155
156        for batch in plan.batches {
157            if batch.items.is_empty() {
158                continue;
159            }
160            match batch.kind {
161                GraphMutationKind::Create => {
162                    let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
163                    for item in batch.items {
164                        cmd.batch_values.push(item.values);
165                        if let Some(token) = item.scope_token {
166                            cmd.trace_chains.push(token.recover_trace_chain());
167                        } else {
168                            cmd.trace_chains.push(Vec::new());
169                        }
170                    }
171                    self.execute_prepared_batch_insert(cmd).await?;
172                }
173                GraphMutationKind::Update => {
174                    let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
175                    for item in batch.items {
176                        let id = item.values.get("id").cloned().ok_or_else(|| {
177                            RepositoryError::Runtime(RuntimeError::Graph(format!(
178                                "update item in batch missing id for {}", batch.entity
179                            )))
180                        })?;
181                        let version = item.values.get("version").and_then(|v| {
182                            if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
183                        });
184                        cmd.batch_values.push(item.values);
185                        cmd.batch_ids.push(id);
186                        cmd.batch_expected_versions.push(version);
187                        cmd.batch_old_values.push(item.old_values);
188                        if let Some(token) = item.scope_token {
189                            cmd.trace_chains.push(token.recover_trace_chain());
190                        } else {
191                            cmd.trace_chains.push(Vec::new());
192                        }
193                    }
194                    self.execute_prepared_batch_update(cmd).await?;
195                }
196                GraphMutationKind::Delete => {
197                    // For now, loop individually since we lack BatchDeleteCommand
198                    for item in batch.items {
199                        let id = item.values.get("id").cloned().ok_or_else(|| {
200                            RepositoryError::Runtime(RuntimeError::Graph(format!(
201                                "delete item in batch missing id for {}", batch.entity
202                            )))
203                        })?;
204                        let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
205                        if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
206                            cmd = cmd.expected_version(*version);
207                        }
208                        let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
209                        self.delete_scoped(&cmd, trace_chain).await?;
210                    }
211                }
212                GraphMutationKind::Reference => {
213                    // References are skipped in execution, they only validate during traversal
214                }
215            }
216        }
217
218        Ok(root)
219    }
220
221    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
222    where
223        T: Entity,
224    {
225        let descriptor = T::entity_descriptor();
226        if descriptor.name != self.entity {
227            return Err(RuntimeError::Graph(format!(
228                "resolved repository {} cannot extract graph root {}",
229                self.entity, descriptor.name
230            )));
231        }
232        // Extract dirty field names BEFORE into_record() consumes the entity.
233        // This is the Rust equivalent of Java's entity.getUpdatedProperties().
234        let dirty_fields = entity.dirty_fields();
235        let original_values = entity.original_values();
236        let is_deleted = entity.is_marked_as_delete();
237        let comment = entity.get_comment();
238        let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
239        node.dirty_fields = dirty_fields;
240        node.original_values = original_values;
241        if is_deleted {
242            node.operation = GraphOperation::Remove;
243            node.relations.clear();
244        }
245        if let Some(c) = comment {
246            node.set_comment(c);
247        }
248        Ok(node)
249    }
250
251    fn collect_graph_plan<'b, 's: 'b>(
252        &'b self,
253        node: &'b mut GraphNode,
254        plan: &'b mut GraphMutationPlan,
255        parent_scope: Option<&'s ScopedCommentNode<'s>>,
256        parent_token: Option<Arc<TraceScopeToken>>,
257        parent_is_create: bool,
258    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
259        Box::pin(async move {
260        match node.operation {
261            GraphOperation::Reference => {
262                plan.push(
263                    node.entity.clone(),
264                    GraphMutationKind::Reference,
265                    node.values.clone(),
266                    Vec::new(),
267                    parent_token,
268                    node.original_values.clone(),
269                );
270                return Ok(());
271            }
272            GraphOperation::Remove => {
273                plan.push(
274                    node.entity.clone(),
275                    GraphMutationKind::Delete,
276                    node.values.clone(),
277                    Vec::new(),
278                    parent_token,
279                    node.original_values.clone(),
280                );
281                return Ok(());
282            }
283            GraphOperation::Upsert | GraphOperation::Create => {}
284        }
285
286        let descriptor = self
287            .repository
288            .metadata
289            .context
290            .require_entity(&node.entity)
291            .map_err(RepositoryError::Runtime)?;
292
293        // Create scope node on the current stack frame if this node has a comment
294        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
295            parent: parent_scope,
296            track: teaql_core::TraceNode {
297                entity_type: node.entity.clone(),
298                entity_id: node.id().and_then(|v| match v {
299                    Value::U64(n) => Some(*n),
300                    Value::I64(n) => Some(*n as u64),
301                    _ => None,
302                }),
303                comment: c.clone(),
304            },
305        });
306        let active_scope = current_scope.as_ref().or(parent_scope);
307
308        let id_property = descriptor.id_property().cloned();
309        let id = id_property.as_ref().and_then(|property| {
310            node.values
311                .get(&property.name)
312                .filter(|value| !is_unassigned_id_value(value))
313                .cloned()
314        });
315
316        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
317
318        let is_update = if is_create_op {
319            false
320        } else {
321            match (id_property.as_ref(), id.as_ref()) {
322                (Some(id_property), Some(id)) => self
323                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
324                    .is_some(),
325                _ => false,
326            }
327        };
328        if !is_update {
329            if let Some(id_property) = id_property.as_ref() {
330                let needs_id = !node.values.contains_key(&id_property.name)
331                    || node
332                        .values
333                        .get(&id_property.name)
334                        .is_some_and(is_unassigned_id_value);
335                if needs_id {
336                    let id = self
337                        .repository
338                        .metadata
339                        .context
340                        .next_id(&node.entity)
341                        .map_err(RepositoryError::Runtime)?;
342                    node.values.insert(id_property.name.clone(), Value::U64(id));
343                }
344            }
345            ensure_initial_version(&mut node.values, descriptor);
346        }
347        let update_fields = if is_update {
348            let mut excluded = Vec::new();
349            if let Some(id_property) = id_property.as_ref() {
350                excluded.push(id_property.name.clone());
351            }
352            if let Some(version_property) = descriptor.version_property() {
353                excluded.push(version_property.name.clone());
354            }
355            let mut fields = sorted_update_fields(&node.values, excluded);
356            if let Some(dirty) = &node.dirty_fields {
357                fields.retain(|f| dirty.contains(f));
358            }
359            fields
360        } else {
361            Vec::new()
362        };
363
364        // Build the TraceScopeToken for this node (only if it has a comment).
365        // This is an Arc-linked persistent list: zero-copy, O(1) creation.
366        let current_token = if let Some(c) = &node.comment {
367            Some(Arc::new(TraceScopeToken {
368                parent: parent_token.clone(),
369                track: teaql_core::TraceNode {
370                    entity_type: node.entity.clone(),
371                    entity_id: node.id().and_then(|v| match v {
372                        Value::U64(n) => Some(*n),
373                        Value::I64(n) => Some(*n as u64),
374                        _ => None,
375                    }),
376                    comment: c.clone(),
377                },
378                node_index: plan.next_item_index,
379            }))
380        } else {
381            parent_token.clone()
382        };
383
384        plan.push(
385            node.entity.clone(),
386            if is_update {
387                GraphMutationKind::Update
388            } else {
389                GraphMutationKind::Create
390            },
391            node.values.clone(),
392            update_fields,
393            current_token.clone(),
394            node.original_values.clone(),
395        );
396
397        for (name, children) in &mut node.relations {
398            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
399                RepositoryError::Runtime(RuntimeError::MissingRelation {
400                    entity: node.entity.clone(),
401                    relation: name.clone(),
402                })
403            })?;
404            let child_repo = self.scoped_repository(relation.target_entity.clone());
405            for child in children {
406                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
407                child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
408            }
409        }
410        Ok(())
411        })
412    }
413
414    fn insert_graph_node_scoped<'b, 's: 'b>(
415        &'b self,
416        mut node: GraphNode,
417        parent_scope: Option<&'s ScopedCommentNode<'s>>,
418    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
419        Box::pin(async move {
420        match node.operation {
421            GraphOperation::Upsert | GraphOperation::Create => {}
422            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
423            GraphOperation::Remove => {
424                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
425                    "create graph cannot remove node {}",
426                    node.entity
427                ))));
428            }
429        }
430
431        // Create scope node on the current stack frame if this node has a comment
432        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
433            parent: parent_scope,
434            track: teaql_core::TraceNode {
435                entity_type: node.entity.clone(),
436                entity_id: node
437                    .id()
438                    .and_then(|v| match v {
439                        Value::U64(n) => Some(*n),
440                        Value::I64(n) => Some(*n as u64),
441                        _ => None,
442                    }),
443                comment: c.clone(),
444            },
445        });
446        let active_scope = current_scope.as_ref().or(parent_scope);
447
448        let descriptor = self
449            .repository
450            .metadata
451            .context
452            .require_entity(&node.entity)
453            .map_err(RepositoryError::Runtime)?;
454
455        let mut one_relations = Vec::new();
456        let mut many_relations = Vec::new();
457        for (name, children) in std::mem::take(&mut node.relations) {
458            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
459                RepositoryError::Runtime(RuntimeError::MissingRelation {
460                    entity: node.entity.clone(),
461                    relation: name.clone(),
462                })
463            })?;
464            if relation.many {
465                many_relations.push((name, relation.clone(), children));
466            } else {
467                one_relations.push((name, relation.clone(), children));
468            }
469        }
470
471        for (name, relation, children) in one_relations {
472            if children.len() > 1 {
473                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
474                    "relation {}.{} expects one child, got {}",
475                    node.entity,
476                    name,
477                    children.len()
478                ))));
479            }
480            let mut saved_children = Vec::new();
481            for child in children {
482                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
483                let child_repo = self.scoped_repository(child.entity.clone());
484                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
485                if relation.attach {
486                    let foreign_value = saved_child
487                        .values
488                        .get(&relation.foreign_key)
489                        .cloned()
490                        .ok_or_else(|| {
491                            RepositoryError::Runtime(RuntimeError::Graph(format!(
492                                "saved child {} missing foreign key {} for relation {}.{}",
493                                relation.target_entity, relation.foreign_key, node.entity, name
494                            )))
495                        })?;
496                    node.values
497                        .insert(relation.local_key.clone(), foreign_value);
498                }
499                saved_children.push(saved_child);
500            }
501            node.relations.insert(name, saved_children);
502        }
503
504        let command = self
505            .prepare_insert_command(&InsertCommand {
506                entity: node.entity.clone(),
507                values: node.values.clone(),
508                trace_chain: Vec::new(),
509            })
510            .map_err(RepositoryError::Runtime)?;
511        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
512        self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
513        node.values = command.values;
514
515        for (name, relation, children) in many_relations {
516            let local_value = node
517                .values
518                .get(&relation.local_key)
519                .cloned()
520                .ok_or_else(|| {
521                    RepositoryError::Runtime(RuntimeError::Graph(format!(
522                        "parent {} missing local key {} for relation {}",
523                        node.entity, relation.local_key, name
524                    )))
525                })?;
526            let mut saved_children = Vec::new();
527            for mut child in children {
528                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
529                if relation.attach {
530                    child
531                        .values
532                        .insert(relation.foreign_key.clone(), local_value.clone());
533                }
534                let child_repo = self.scoped_repository(child.entity.clone());
535                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
536            }
537            node.relations.insert(name, saved_children);
538        }
539
540        Ok(node)
541        })
542    }
543
544    fn upsert_graph_node_scoped<'b, 's: 'b>(
545        &'b self,
546        mut node: GraphNode,
547        parent_scope: Option<&'s ScopedCommentNode<'s>>,
548    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
549        Box::pin(async move {
550        // Create scope node on the current stack frame if this node has a comment
551        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
552            parent: parent_scope,
553            track: teaql_core::TraceNode {
554                entity_type: node.entity.clone(),
555                entity_id: node
556                    .id()
557                    .and_then(|v| match v {
558                        Value::U64(n) => Some(*n),
559                        Value::I64(n) => Some(*n as u64),
560                        _ => None,
561                    }),
562                comment: c.clone(),
563            },
564        });
565        let active_scope = current_scope.as_ref().or(parent_scope);
566
567        match node.operation {
568            GraphOperation::Upsert | GraphOperation::Create => {}
569            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
570            GraphOperation::Remove => {
571                self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
572                self.delete_graph_node(&node, parent_scope).await?;
573                return Ok(node);
574            }
575        }
576
577        let descriptor = self
578            .repository
579            .metadata
580            .context
581            .require_entity(&node.entity)
582            .map_err(RepositoryError::Runtime)?;
583        let Some(id_property) = descriptor.id_property() else {
584            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
585                "entity {} has no id property for graph upsert",
586                node.entity
587            ))));
588        };
589        let Some(id) = node
590            .values
591            .get(&id_property.name)
592            .filter(|value| !is_unassigned_id_value(value))
593            .cloned()
594        else {
595            // Strip comment to prevent duplicate scope — already captured in active_scope
596            node.comment = None;
597            return self.insert_graph_node_scoped(node, active_scope).await;
598        };
599
600        if node.operation == GraphOperation::Create || self
601            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
602            .is_none()
603        {
604            node.comment = None;
605            return self.insert_graph_node_scoped(node, active_scope).await;
606        }
607
608        let mut one_relations = Vec::new();
609        let mut many_relations = Vec::new();
610        for (name, children) in std::mem::take(&mut node.relations) {
611            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
612                RepositoryError::Runtime(RuntimeError::MissingRelation {
613                    entity: node.entity.clone(),
614                    relation: name.clone(),
615                })
616            })?;
617            if relation.many {
618                many_relations.push((name, relation.clone(), children));
619            } else {
620                one_relations.push((name, relation.clone(), children));
621            }
622        }
623
624        for (name, relation, children) in one_relations {
625            if children.len() > 1 {
626                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
627                    "relation {}.{} expects one child, got {}",
628                    node.entity,
629                    name,
630                    children.len()
631                ))));
632            }
633            let mut saved_children = Vec::new();
634            for child in children {
635                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
636                let child_repo = self.scoped_repository(child.entity.clone());
637                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
638                if relation.attach {
639                    let foreign_value = saved_child
640                        .values
641                        .get(&relation.foreign_key)
642                        .cloned()
643                        .ok_or_else(|| {
644                            RepositoryError::Runtime(RuntimeError::Graph(format!(
645                                "saved child {} missing foreign key {} for relation {}.{}",
646                                relation.target_entity, relation.foreign_key, node.entity, name
647                            )))
648                        })?;
649                    node.values
650                        .insert(relation.local_key.clone(), foreign_value);
651                }
652                saved_children.push(saved_child);
653            }
654            node.relations.insert(name, saved_children);
655        }
656
657        let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
658        if !update.values.is_empty() {
659            let prepared_update = self
660                .prepare_update_command(&update)
661                .map_err(RepositoryError::Runtime)?;
662            let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
663            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
664            for (field, value) in &prepared_update.values {
665                node.values.insert(field.clone(), value.clone());
666            }
667            if let Some(version_property) = descriptor.version_property() {
668                if let Some(expected_version) = prepared_update.expected_version {
669                    node.values.insert(
670                        version_property.name.clone(),
671                        Value::I64(expected_version + 1),
672                    );
673                }
674            }
675        }
676
677        for (name, relation, children) in many_relations {
678            let local_value = node
679                .values
680                .get(&relation.local_key)
681                .cloned()
682                .ok_or_else(|| {
683                    RepositoryError::Runtime(RuntimeError::Graph(format!(
684                        "parent {} missing local key {} for relation {}",
685                        node.entity, relation.local_key, name
686                    )))
687                })?;
688            let child_repo = self.scoped_repository(relation.target_entity.clone());
689            let child_descriptor = self
690                .repository
691                .metadata
692                .context
693                .require_entity(&relation.target_entity)
694                .map_err(RepositoryError::Runtime)?;
695            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
696                RepositoryError::Runtime(RuntimeError::Graph(format!(
697                    "entity {} has no id property",
698                    relation.target_entity
699                )))
700            })?;
701
702            let mut seen = std::collections::BTreeSet::new();
703            let mut saved_children = Vec::new();
704            for mut child in children {
705                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
706                if relation.attach && child.operation != GraphOperation::Reference {
707                    child
708                        .values
709                        .insert(relation.foreign_key.clone(), local_value.clone());
710                }
711                if let Some(child_id) = child
712                    .values
713                    .get(&child_id_property.name)
714                    .filter(|value| !is_unassigned_id_value(value))
715                {
716                    let key = graph_identity_key(child_id);
717                    if !seen.insert(key.clone()) {
718                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
719                            "duplicate child id {key} in relation {}.{}",
720                            node.entity, name
721                        ))));
722                    }
723                }
724                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
725            }
726
727
728
729            node.relations.insert(name, saved_children);
730        }
731
732        Ok(node)
733        })
734    }
735
736    async fn validate_reference_node(
737        &self,
738        node: GraphNode,
739        trace_chain: Vec<teaql_core::TraceNode>,
740    ) -> Result<GraphNode, RepositoryError<E::Error>> {
741        if !node.relations.is_empty() {
742            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
743                "reference node {} cannot contain child relations",
744                node.entity
745            ))));
746        }
747        let descriptor = self
748            .repository
749            .metadata
750            .context
751            .require_entity(&node.entity)
752            .map_err(RepositoryError::Runtime)?;
753        let id_property = descriptor.id_property().ok_or_else(|| {
754            RepositoryError::Runtime(RuntimeError::Graph(format!(
755                "entity {} has no id property for graph reference",
756                node.entity
757            )))
758        })?;
759        let id = node
760            .values
761            .get(&id_property.name)
762            .filter(|value| !is_unassigned_id_value(value))
763            .cloned()
764            .ok_or_else(|| {
765                RepositoryError::Runtime(RuntimeError::Graph(format!(
766                    "reference node {} missing id property {}",
767                    node.entity, id_property.name
768                )))
769            })?;
770
771        for field in node.values.keys() {
772            if field == &id_property.name {
773                continue;
774            }
775            if descriptor
776                .version_property()
777                .map(|property| field == &property.name)
778                .unwrap_or(false)
779            {
780                continue;
781            }
782            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
783                "reference node {} cannot carry mutable field {}",
784                node.entity, field
785            ))));
786        }
787
788        let current = self
789            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
790            .ok_or_else(|| {
791                RepositoryError::Runtime(RuntimeError::Graph(format!(
792                    "reference node {}({}) does not exist",
793                    node.entity,
794                    graph_identity_key(&id)
795                )))
796            })?;
797
798        if let Some(version_property) = descriptor.version_property() {
799            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
800                if *existing_version < 0 {
801                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
802                        "reference node {}({}) is deleted",
803                        node.entity,
804                        graph_identity_key(&id)
805                    ))));
806                }
807                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
808                {
809                    if expected_version != existing_version {
810                        return Err(RepositoryError::Runtime(
811                            RuntimeError::OptimisticLockConflict {
812                                entity: node.entity,
813                                id: graph_identity_key(&id),
814                            },
815                        ));
816                    }
817                }
818            }
819        }
820
821        Ok(GraphNode {
822            entity: node.entity,
823            values: current,
824            relations: BTreeMap::new(),
825            operation: GraphOperation::Reference,
826            comment: None,
827            dirty_fields: None, original_values: None,
828        })
829    }
830
831    async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
832        if !node.relations.is_empty() {
833            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
834                "remove node {} cannot contain child relations",
835                node.entity
836            ))));
837        }
838        let descriptor = self
839            .repository
840            .metadata
841            .context
842            .require_entity(&node.entity)
843            .map_err(RepositoryError::Runtime)?;
844        let id_property = descriptor.id_property().ok_or_else(|| {
845            RepositoryError::Runtime(RuntimeError::Graph(format!(
846                "entity {} has no id property for graph remove",
847                node.entity
848            )))
849        })?;
850        let id = node
851            .values
852            .get(&id_property.name)
853            .filter(|value| !is_unassigned_id_value(value))
854            .cloned()
855            .ok_or_else(|| {
856                RepositoryError::Runtime(RuntimeError::Graph(format!(
857                    "remove node {} missing id property {}",
858                    node.entity, id_property.name
859                )))
860            })?;
861        let current = self
862            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
863            .ok_or_else(|| {
864                RepositoryError::Runtime(RuntimeError::Graph(format!(
865                    "remove node {}({}) does not exist",
866                    node.entity,
867                    graph_identity_key(&id)
868                )))
869            })?;
870        if let Some(version_property) = descriptor.version_property() {
871            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
872                if *existing_version < 0 {
873                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
874                        "remove node {}({}) is already deleted",
875                        node.entity,
876                        graph_identity_key(&id)
877                    ))));
878                }
879            }
880        }
881        Ok(())
882    }
883
884    fn graph_node_from_record(
885        &self,
886        entity: &str,
887        record: Record,
888    ) -> Result<GraphNode, RuntimeError> {
889        let descriptor = self.repository.metadata.context.require_entity(entity)?;
890        let mut node = GraphNode::new(entity);
891
892        for (field, value) in record {
893            if field == "_comment" {
894                if let Value::Text(comment) = value {
895                    node.set_comment(comment);
896                }
897                continue;
898            }
899            let Some(relation) = descriptor.relation_by_name(&field) else {
900                node.values.insert(field, value);
901                continue;
902            };
903
904            match value {
905                Value::Null => {
906                    node.relations.entry(field).or_default();
907                }
908                Value::Object(record) => {
909                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
910                    node.relations.entry(field).or_default().push(child);
911                }
912                Value::List(values) => {
913                    let children = node.relations.entry(field.clone()).or_default();
914                    for value in values {
915                        let Value::Object(record) = value else {
916                            return Err(RuntimeError::Graph(format!(
917                                "relation {}.{} expects object children, got {:?}",
918                                entity, field, value
919                            )));
920                        };
921                        children
922                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
923                    }
924                }
925                other => {
926                    return Err(RuntimeError::Graph(format!(
927                        "relation {}.{} expects object/list/null, got {:?}",
928                        entity, field, other
929                    )));
930                }
931            }
932        }
933
934        Ok(node)
935    }
936
937    fn graph_update_command(
938        &self,
939        node: &mut GraphNode,
940        descriptor: &EntityDescriptor,
941        id_property: &PropertyDescriptor,
942        id: &Value,
943    ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
944        crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
945        let check_result = self
946            .repository
947            .metadata
948            .context
949            .check_and_fix_record(&node.entity, &mut node.values);
950        crate::clear_record_status(&mut node.values);
951        check_result.map_err(RepositoryError::Runtime)?;
952
953        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
954        command.old_values = node.original_values.clone();
955        if let Some(version_property) = descriptor.version_property() {
956            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
957                command = command.expected_version(*version);
958            }
959        }
960        // Filter properties by dirty_fields when available (Java-style minimal UPDATE).
961        // When dirty_fields is Some, only modified fields are included in the SET clause.
962        // When dirty_fields is None (no tracking), fall back to all fields in node.values.
963        for property in descriptor.properties.iter().filter(|property| {
964            !property.is_id
965                && !property.is_version
966                && property.name != id_property.name
967                && match &node.dirty_fields {
968                    Some(dirty) => dirty.contains(&property.name),
969                    None => node.values.contains_key(&property.name),
970                }
971        }) {
972            if let Some(value) = node.values.get(&property.name) {
973                command.values.insert(property.name.clone(), value.clone());
974            }
975        }
976        Ok(command)
977    }
978
979    fn delete_graph_node<'b, 's: 'b>(
980        &'b self,
981        node: &'b GraphNode,
982        parent_scope: Option<&'s ScopedCommentNode<'s>>,
983    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
984        Box::pin(async move {
985        let descriptor = self
986            .repository
987            .metadata
988            .context
989            .require_entity(&node.entity)
990            .map_err(RepositoryError::Runtime)?;
991        let id_property = descriptor.id_property().ok_or_else(|| {
992            RepositoryError::Runtime(RuntimeError::Graph(format!(
993                "entity {} has no id property for graph remove",
994                node.entity
995            )))
996        })?;
997        let id = node
998            .values
999            .get(&id_property.name)
1000            .filter(|value| !is_unassigned_id_value(value))
1001            .cloned()
1002            .ok_or_else(|| {
1003                RepositoryError::Runtime(RuntimeError::Graph(format!(
1004                    "remove node {} missing id property {}",
1005                    node.entity, id_property.name
1006                )))
1007            })?;
1008        let mut delete = DeleteCommand::new(node.entity.clone(), id);
1009        if let Some(version_property) = descriptor.version_property() {
1010            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1011                delete = delete.expected_version(*version);
1012            }
1013        }
1014
1015        // Create scope node for deletion if parent/node comment is present
1016        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1017            parent: parent_scope,
1018            track: teaql_core::TraceNode {
1019                entity_type: node.entity.clone(),
1020                entity_id: node
1021                    .id()
1022                    .and_then(|v| match v {
1023                        Value::U64(n) => Some(*n),
1024                        Value::I64(n) => Some(*n as u64),
1025                        _ => None,
1026                    }),
1027                comment: c.clone(),
1028            },
1029        });
1030        let active_scope = current_scope.as_ref().or(parent_scope);
1031        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1032
1033        self.delete_scoped(&delete, lineage).await
1034        })
1035    }
1036
1037    pub async fn fetch_graph_current_row(
1038        &self,
1039        entity: &str,
1040        id_property: &str,
1041        id: &Value,
1042        trace_chain: Vec<teaql_core::TraceNode>,
1043    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1044        let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1045        query.trace_chain = trace_chain;
1046        let mut rows = self
1047            .scoped_repository(entity.to_owned())
1048            .fetch_all(&query).await?;
1049        Ok(rows.pop())
1050    }
1051
1052    async fn fetch_graph_children(
1053        &self,
1054        entity: &str,
1055        foreign_key: &str,
1056        parent_value: &Value,
1057        trace_chain: Vec<teaql_core::TraceNode>,
1058    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1059        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1060        query.trace_chain = trace_chain;
1061        self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1062    }
1063}