Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6    SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10    GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11    RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21    pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22        if node.entity != self.entity {
23            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24                "resolved repository {} cannot save graph root {}",
25                self.entity, node.entity
26            ))));
27        }
28        self.upsert_graph_node_scoped(node, None).await
29    }
30
31    pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
32        fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
33            let mut relations = BTreeMap::new();
34            for (rel_name, child) in node.children {
35                relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
36            }
37            GraphNode {
38                entity: node.entity_type,
39                values: node.record,
40                relations,
41                operation: match node.operation {
42                    teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
43                    teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
44                },
45                comment: node.comment,
46                dirty_fields: None, original_values: None,
47            }
48        }
49        self.save_graph(convert(graph.root)).await
50    }
51
52    pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
53    where
54        T: Entity,
55    {
56        let node = self
57            .graph_node_from_entity(entity)
58            .map_err(RepositoryError::Runtime)?;
59        self.save_graph(node).await
60    }
61
62    pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
63    where
64        T: Entity,
65    {
66        if !status.need_persist() {
67            return Ok(GraphNode::new(&self.entity));
68        }
69        if status.is_deleted() {
70            let mut node = self.graph_node_from_entity(entity)
71                .map_err(RepositoryError::Runtime)?;
72            node.operation = GraphOperation::Remove;
73            node.relations.clear();
74            self.save_graph(node).await
75        } else {
76            self.save_entity_graph(entity).await
77        }
78    }
79    pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
80    where
81        T: Entity,
82    {
83        if status.is_deleted() {
84            let mut node = self.graph_node_from_entity(entity)
85                .map_err(RepositoryError::Runtime)?;
86            node.operation = GraphOperation::Remove;
87            node.relations.clear();
88            node.set_comment(comment);
89            self.save_graph(node).await
90        } else {
91            self.save_entity_graph_with_comment(entity, comment).await
92        }
93    }
94    pub async fn save_entity_graph_with_comment<T>(
95        &self,
96        entity: T,
97        comment: impl Into<String>,
98    ) -> Result<GraphNode, RepositoryError<E::Error>>
99    where
100        T: Entity,
101    {
102        let mut node = self
103            .graph_node_from_entity(entity)
104            .map_err(RepositoryError::Runtime)?;
105        node.set_comment(comment);
106        self.save_graph(node).await
107    }
108
109    /// Create a new entity graph with an annotation comment on the root node.
110    /// This assumes all new nodes do not exist in the database, skipping existence checks
111    /// and throwing an exception on primary key conflict.
112    pub async fn create_entity_graph_with_comment<T>(
113        &self,
114        entity: T,
115        comment: impl Into<String>,
116    ) -> Result<GraphNode, RepositoryError<E::Error>>
117    where
118        T: Entity,
119    {
120        let mut node = self
121            .graph_node_from_entity(entity)
122            .map_err(RepositoryError::Runtime)?;
123        node.operation = GraphOperation::Create;
124        node.set_comment(comment);
125        self.save_graph(node).await
126    }
127
128    pub async fn plan_graph(
129        &self,
130        node: GraphNode,
131    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
132        if node.entity != self.entity {
133            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
134                "resolved repository {} cannot plan graph root {}",
135                self.entity, node.entity
136            ))));
137        }
138        let mut node = node;
139        let mut plan = GraphMutationPlan::default();
140        self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
141        plan.planned_root = Some(node);
142        plan.rebuild_batches();
143        Ok(plan)
144    }
145
146    pub async fn execute_graph_plan(
147        &self,
148        plan: GraphMutationPlan,
149    ) -> Result<GraphNode, RepositoryError<E::Error>> {
150        let Some(root) = plan.planned_root else {
151            return Err(RepositoryError::Runtime(RuntimeError::Graph(
152                "graph mutation plan has no planned root".to_owned(),
153            )));
154        };
155
156        for batch in plan.batches {
157            if batch.items.is_empty() {
158                continue;
159            }
160            match batch.kind {
161                GraphMutationKind::Create => {
162                    let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
163                    for item in batch.items {
164                        cmd.batch_values.push(item.values);
165                        if let Some(token) = item.scope_token {
166                            cmd.trace_chains.push(token.recover_trace_chain());
167                        } else {
168                            cmd.trace_chains.push(Vec::new());
169                        }
170                    }
171                    self.execute_prepared_batch_insert(cmd).await?;
172                }
173                GraphMutationKind::Update => {
174                    let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
175                    for item in batch.items {
176                        let id = item.values.get("id").cloned().ok_or_else(|| {
177                            RepositoryError::Runtime(RuntimeError::Graph(format!(
178                                "update item in batch missing id for {}", batch.entity
179                            )))
180                        })?;
181                        let version = item.values.get("version").and_then(|v| {
182                            if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
183                        });
184                        cmd.batch_values.push(item.values);
185                        cmd.batch_ids.push(id);
186                        cmd.batch_expected_versions.push(version);
187                        cmd.batch_old_values.push(item.old_values);
188                        if let Some(token) = item.scope_token {
189                            cmd.trace_chains.push(token.recover_trace_chain());
190                        } else {
191                            cmd.trace_chains.push(Vec::new());
192                        }
193                    }
194                    self.execute_prepared_batch_update(cmd).await?;
195                }
196                GraphMutationKind::Delete => {
197                    // For now, loop individually since we lack BatchDeleteCommand
198                    for item in batch.items {
199                        let id = item.values.get("id").cloned().ok_or_else(|| {
200                            RepositoryError::Runtime(RuntimeError::Graph(format!(
201                                "delete item in batch missing id for {}", batch.entity
202                            )))
203                        })?;
204                        let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
205                        if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
206                            cmd = cmd.expected_version(*version);
207                        }
208                        let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
209                        self.delete_scoped(&cmd, trace_chain).await?;
210                    }
211                }
212                GraphMutationKind::Reference => {
213                    // References are skipped in execution, they only validate during traversal
214                }
215            }
216        }
217
218        Ok(root)
219    }
220
221    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
222    where
223        T: Entity,
224    {
225        let descriptor = T::entity_descriptor();
226        if descriptor.name != self.entity {
227            return Err(RuntimeError::Graph(format!(
228                "resolved repository {} cannot extract graph root {}",
229                self.entity, descriptor.name
230            )));
231        }
232        // Extract dirty field names BEFORE into_record() consumes the entity.
233        // This is the Rust equivalent of Java's entity.getUpdatedProperties().
234        let dirty_fields = entity.dirty_fields();
235        let original_values = entity.original_values();
236        let is_deleted = entity.is_marked_as_delete();
237        let comment = entity.get_comment();
238        let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
239        node.dirty_fields = dirty_fields;
240        node.original_values = original_values;
241        if is_deleted {
242            node.operation = GraphOperation::Remove;
243            node.relations.clear();
244        }
245        if let Some(c) = comment {
246            node.set_comment(c);
247        }
248        Ok(node)
249    }
250
251    fn collect_graph_plan<'b, 's: 'b>(
252        &'b self,
253        node: &'b mut GraphNode,
254        plan: &'b mut GraphMutationPlan,
255        parent_scope: Option<&'s ScopedCommentNode<'s>>,
256        parent_token: Option<Arc<TraceScopeToken>>,
257        parent_is_create: bool,
258    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
259        Box::pin(async move {
260        match node.operation {
261            GraphOperation::Reference => {
262                plan.push(
263                    node.entity.clone(),
264                    GraphMutationKind::Reference,
265                    node.values.clone(),
266                    Vec::new(),
267                    parent_token,
268                    node.original_values.clone(),
269                );
270                return Ok(());
271            }
272            GraphOperation::Remove => {
273                plan.push(
274                    node.entity.clone(),
275                    GraphMutationKind::Delete,
276                    node.values.clone(),
277                    Vec::new(),
278                    parent_token,
279                    node.original_values.clone(),
280                );
281                return Ok(());
282            }
283            GraphOperation::Upsert | GraphOperation::Create => {}
284        }
285
286        let descriptor = self
287            .repository
288            .metadata
289            .context
290            .require_entity(&node.entity)
291            .map_err(RepositoryError::Runtime)?;
292
293        // Create scope node on the current stack frame if this node has a comment
294        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
295            parent: parent_scope,
296            track: teaql_core::TraceNode {
297                entity_type: node.entity.clone(),
298                entity_id: node.id().and_then(|v| match v {
299                    Value::U64(n) => Some(*n),
300                    Value::I64(n) => Some(*n as u64),
301                    _ => None,
302                }),
303                comment: c.clone(),
304            },
305        });
306        let active_scope = current_scope.as_ref().or(parent_scope);
307
308        let id_property = descriptor.id_property().cloned();
309        let id = id_property.as_ref().and_then(|property| {
310            node.values
311                .get(&property.name)
312                .filter(|value| !is_unassigned_id_value(value))
313                .cloned()
314        });
315
316        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
317
318        let is_update = if is_create_op {
319            false
320        } else {
321            match (id_property.as_ref(), id.as_ref()) {
322                (Some(id_property), Some(id)) => self
323                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
324                    .is_some(),
325                _ => false,
326            }
327        };
328        if !is_update {
329            if let Some(id_property) = id_property.as_ref() {
330                let needs_id = !node.values.contains_key(&id_property.name)
331                    || node
332                        .values
333                        .get(&id_property.name)
334                        .is_some_and(is_unassigned_id_value);
335                if needs_id {
336                    let id = self
337                        .repository
338                        .metadata
339                        .context
340                        .next_id(&node.entity)
341                        .map_err(RepositoryError::Runtime)?;
342                    node.values.insert(id_property.name.clone(), Value::U64(id));
343                }
344            }
345            ensure_initial_version(&mut node.values, descriptor);
346        }
347        let update_fields = if is_update {
348            let mut excluded = Vec::new();
349            if let Some(id_property) = id_property.as_ref() {
350                excluded.push(id_property.name.clone());
351            }
352            if let Some(version_property) = descriptor.version_property() {
353                excluded.push(version_property.name.clone());
354            }
355            sorted_update_fields(&node.values, excluded)
356        } else {
357            Vec::new()
358        };
359
360        // Build the TraceScopeToken for this node (only if it has a comment).
361        // This is an Arc-linked persistent list: zero-copy, O(1) creation.
362        let current_token = if let Some(c) = &node.comment {
363            Some(Arc::new(TraceScopeToken {
364                parent: parent_token.clone(),
365                track: teaql_core::TraceNode {
366                    entity_type: node.entity.clone(),
367                    entity_id: node.id().and_then(|v| match v {
368                        Value::U64(n) => Some(*n),
369                        Value::I64(n) => Some(*n as u64),
370                        _ => None,
371                    }),
372                    comment: c.clone(),
373                },
374                node_index: plan.next_item_index,
375            }))
376        } else {
377            parent_token.clone()
378        };
379
380        plan.push(
381            node.entity.clone(),
382            if is_update {
383                GraphMutationKind::Update
384            } else {
385                GraphMutationKind::Create
386            },
387            node.values.clone(),
388            update_fields,
389            current_token.clone(),
390            node.original_values.clone(),
391        );
392
393        for (name, children) in &mut node.relations {
394            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
395                RepositoryError::Runtime(RuntimeError::MissingRelation {
396                    entity: node.entity.clone(),
397                    relation: name.clone(),
398                })
399            })?;
400            let child_repo = self.scoped_repository(relation.target_entity.clone());
401            for child in children {
402                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
403                child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
404            }
405        }
406        Ok(())
407        })
408    }
409
410    fn insert_graph_node_scoped<'b, 's: 'b>(
411        &'b self,
412        mut node: GraphNode,
413        parent_scope: Option<&'s ScopedCommentNode<'s>>,
414    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
415        Box::pin(async move {
416        match node.operation {
417            GraphOperation::Upsert | GraphOperation::Create => {}
418            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
419            GraphOperation::Remove => {
420                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
421                    "create graph cannot remove node {}",
422                    node.entity
423                ))));
424            }
425        }
426
427        // Create scope node on the current stack frame if this node has a comment
428        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
429            parent: parent_scope,
430            track: teaql_core::TraceNode {
431                entity_type: node.entity.clone(),
432                entity_id: node
433                    .id()
434                    .and_then(|v| match v {
435                        Value::U64(n) => Some(*n),
436                        Value::I64(n) => Some(*n as u64),
437                        _ => None,
438                    }),
439                comment: c.clone(),
440            },
441        });
442        let active_scope = current_scope.as_ref().or(parent_scope);
443
444        let descriptor = self
445            .repository
446            .metadata
447            .context
448            .require_entity(&node.entity)
449            .map_err(RepositoryError::Runtime)?;
450
451        let mut one_relations = Vec::new();
452        let mut many_relations = Vec::new();
453        for (name, children) in std::mem::take(&mut node.relations) {
454            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
455                RepositoryError::Runtime(RuntimeError::MissingRelation {
456                    entity: node.entity.clone(),
457                    relation: name.clone(),
458                })
459            })?;
460            if relation.many {
461                many_relations.push((name, relation.clone(), children));
462            } else {
463                one_relations.push((name, relation.clone(), children));
464            }
465        }
466
467        for (name, relation, children) in one_relations {
468            if children.len() > 1 {
469                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
470                    "relation {}.{} expects one child, got {}",
471                    node.entity,
472                    name,
473                    children.len()
474                ))));
475            }
476            let mut saved_children = Vec::new();
477            for child in children {
478                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
479                let child_repo = self.scoped_repository(child.entity.clone());
480                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
481                if relation.attach {
482                    let foreign_value = saved_child
483                        .values
484                        .get(&relation.foreign_key)
485                        .cloned()
486                        .ok_or_else(|| {
487                            RepositoryError::Runtime(RuntimeError::Graph(format!(
488                                "saved child {} missing foreign key {} for relation {}.{}",
489                                relation.target_entity, relation.foreign_key, node.entity, name
490                            )))
491                        })?;
492                    node.values
493                        .insert(relation.local_key.clone(), foreign_value);
494                }
495                saved_children.push(saved_child);
496            }
497            node.relations.insert(name, saved_children);
498        }
499
500        let command = self
501            .prepare_insert_command(&InsertCommand {
502                entity: node.entity.clone(),
503                values: node.values.clone(),
504                trace_chain: Vec::new(),
505            })
506            .map_err(RepositoryError::Runtime)?;
507        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
508        self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
509        node.values = command.values;
510
511        for (name, relation, children) in many_relations {
512            let local_value = node
513                .values
514                .get(&relation.local_key)
515                .cloned()
516                .ok_or_else(|| {
517                    RepositoryError::Runtime(RuntimeError::Graph(format!(
518                        "parent {} missing local key {} for relation {}",
519                        node.entity, relation.local_key, name
520                    )))
521                })?;
522            let mut saved_children = Vec::new();
523            for mut child in children {
524                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
525                if relation.attach {
526                    child
527                        .values
528                        .insert(relation.foreign_key.clone(), local_value.clone());
529                }
530                let child_repo = self.scoped_repository(child.entity.clone());
531                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
532            }
533            node.relations.insert(name, saved_children);
534        }
535
536        Ok(node)
537        })
538    }
539
540    fn upsert_graph_node_scoped<'b, 's: 'b>(
541        &'b self,
542        mut node: GraphNode,
543        parent_scope: Option<&'s ScopedCommentNode<'s>>,
544    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
545        Box::pin(async move {
546        // Create scope node on the current stack frame if this node has a comment
547        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
548            parent: parent_scope,
549            track: teaql_core::TraceNode {
550                entity_type: node.entity.clone(),
551                entity_id: node
552                    .id()
553                    .and_then(|v| match v {
554                        Value::U64(n) => Some(*n),
555                        Value::I64(n) => Some(*n as u64),
556                        _ => None,
557                    }),
558                comment: c.clone(),
559            },
560        });
561        let active_scope = current_scope.as_ref().or(parent_scope);
562
563        match node.operation {
564            GraphOperation::Upsert | GraphOperation::Create => {}
565            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
566            GraphOperation::Remove => {
567                self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
568                self.delete_graph_node(&node, parent_scope).await?;
569                return Ok(node);
570            }
571        }
572
573        let descriptor = self
574            .repository
575            .metadata
576            .context
577            .require_entity(&node.entity)
578            .map_err(RepositoryError::Runtime)?;
579        let Some(id_property) = descriptor.id_property() else {
580            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
581                "entity {} has no id property for graph upsert",
582                node.entity
583            ))));
584        };
585        let Some(id) = node
586            .values
587            .get(&id_property.name)
588            .filter(|value| !is_unassigned_id_value(value))
589            .cloned()
590        else {
591            // Strip comment to prevent duplicate scope — already captured in active_scope
592            node.comment = None;
593            return self.insert_graph_node_scoped(node, active_scope).await;
594        };
595
596        if node.operation == GraphOperation::Create || self
597            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
598            .is_none()
599        {
600            node.comment = None;
601            return self.insert_graph_node_scoped(node, active_scope).await;
602        }
603
604        let mut one_relations = Vec::new();
605        let mut many_relations = Vec::new();
606        for (name, children) in std::mem::take(&mut node.relations) {
607            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
608                RepositoryError::Runtime(RuntimeError::MissingRelation {
609                    entity: node.entity.clone(),
610                    relation: name.clone(),
611                })
612            })?;
613            if relation.many {
614                many_relations.push((name, relation.clone(), children));
615            } else {
616                one_relations.push((name, relation.clone(), children));
617            }
618        }
619
620        for (name, relation, children) in one_relations {
621            if children.len() > 1 {
622                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
623                    "relation {}.{} expects one child, got {}",
624                    node.entity,
625                    name,
626                    children.len()
627                ))));
628            }
629            let mut saved_children = Vec::new();
630            for child in children {
631                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
632                let child_repo = self.scoped_repository(child.entity.clone());
633                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
634                if relation.attach {
635                    let foreign_value = saved_child
636                        .values
637                        .get(&relation.foreign_key)
638                        .cloned()
639                        .ok_or_else(|| {
640                            RepositoryError::Runtime(RuntimeError::Graph(format!(
641                                "saved child {} missing foreign key {} for relation {}.{}",
642                                relation.target_entity, relation.foreign_key, node.entity, name
643                            )))
644                        })?;
645                    node.values
646                        .insert(relation.local_key.clone(), foreign_value);
647                }
648                saved_children.push(saved_child);
649            }
650            node.relations.insert(name, saved_children);
651        }
652
653        let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
654        if !update.values.is_empty() || update.expected_version.is_some() {
655            let prepared_update = self
656                .prepare_update_command(&update)
657                .map_err(RepositoryError::Runtime)?;
658            let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
659            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
660            for (field, value) in &prepared_update.values {
661                node.values.insert(field.clone(), value.clone());
662            }
663            if let Some(version_property) = descriptor.version_property() {
664                if let Some(expected_version) = prepared_update.expected_version {
665                    node.values.insert(
666                        version_property.name.clone(),
667                        Value::I64(expected_version + 1),
668                    );
669                }
670            }
671        }
672
673        for (name, relation, children) in many_relations {
674            let local_value = node
675                .values
676                .get(&relation.local_key)
677                .cloned()
678                .ok_or_else(|| {
679                    RepositoryError::Runtime(RuntimeError::Graph(format!(
680                        "parent {} missing local key {} for relation {}",
681                        node.entity, relation.local_key, name
682                    )))
683                })?;
684            let child_repo = self.scoped_repository(relation.target_entity.clone());
685            let child_descriptor = self
686                .repository
687                .metadata
688                .context
689                .require_entity(&relation.target_entity)
690                .map_err(RepositoryError::Runtime)?;
691            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
692                RepositoryError::Runtime(RuntimeError::Graph(format!(
693                    "entity {} has no id property",
694                    relation.target_entity
695                )))
696            })?;
697
698            let mut seen = std::collections::BTreeSet::new();
699            let mut saved_children = Vec::new();
700            for mut child in children {
701                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
702                if relation.attach && child.operation != GraphOperation::Reference {
703                    child
704                        .values
705                        .insert(relation.foreign_key.clone(), local_value.clone());
706                }
707                if let Some(child_id) = child
708                    .values
709                    .get(&child_id_property.name)
710                    .filter(|value| !is_unassigned_id_value(value))
711                {
712                    let key = graph_identity_key(child_id);
713                    if !seen.insert(key.clone()) {
714                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
715                            "duplicate child id {key} in relation {}.{}",
716                            node.entity, name
717                        ))));
718                    }
719                }
720                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
721            }
722
723
724
725            node.relations.insert(name, saved_children);
726        }
727
728        Ok(node)
729        })
730    }
731
732    async fn validate_reference_node(
733        &self,
734        node: GraphNode,
735        trace_chain: Vec<teaql_core::TraceNode>,
736    ) -> Result<GraphNode, RepositoryError<E::Error>> {
737        if !node.relations.is_empty() {
738            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
739                "reference node {} cannot contain child relations",
740                node.entity
741            ))));
742        }
743        let descriptor = self
744            .repository
745            .metadata
746            .context
747            .require_entity(&node.entity)
748            .map_err(RepositoryError::Runtime)?;
749        let id_property = descriptor.id_property().ok_or_else(|| {
750            RepositoryError::Runtime(RuntimeError::Graph(format!(
751                "entity {} has no id property for graph reference",
752                node.entity
753            )))
754        })?;
755        let id = node
756            .values
757            .get(&id_property.name)
758            .filter(|value| !is_unassigned_id_value(value))
759            .cloned()
760            .ok_or_else(|| {
761                RepositoryError::Runtime(RuntimeError::Graph(format!(
762                    "reference node {} missing id property {}",
763                    node.entity, id_property.name
764                )))
765            })?;
766
767        for field in node.values.keys() {
768            if field == &id_property.name {
769                continue;
770            }
771            if descriptor
772                .version_property()
773                .map(|property| field == &property.name)
774                .unwrap_or(false)
775            {
776                continue;
777            }
778            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
779                "reference node {} cannot carry mutable field {}",
780                node.entity, field
781            ))));
782        }
783
784        let current = self
785            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
786            .ok_or_else(|| {
787                RepositoryError::Runtime(RuntimeError::Graph(format!(
788                    "reference node {}({}) does not exist",
789                    node.entity,
790                    graph_identity_key(&id)
791                )))
792            })?;
793
794        if let Some(version_property) = descriptor.version_property() {
795            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
796                if *existing_version < 0 {
797                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
798                        "reference node {}({}) is deleted",
799                        node.entity,
800                        graph_identity_key(&id)
801                    ))));
802                }
803                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
804                {
805                    if expected_version != existing_version {
806                        return Err(RepositoryError::Runtime(
807                            RuntimeError::OptimisticLockConflict {
808                                entity: node.entity,
809                                id: graph_identity_key(&id),
810                            },
811                        ));
812                    }
813                }
814            }
815        }
816
817        Ok(GraphNode {
818            entity: node.entity,
819            values: current,
820            relations: BTreeMap::new(),
821            operation: GraphOperation::Reference,
822            comment: None,
823            dirty_fields: None, original_values: None,
824        })
825    }
826
827    async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
828        if !node.relations.is_empty() {
829            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
830                "remove node {} cannot contain child relations",
831                node.entity
832            ))));
833        }
834        let descriptor = self
835            .repository
836            .metadata
837            .context
838            .require_entity(&node.entity)
839            .map_err(RepositoryError::Runtime)?;
840        let id_property = descriptor.id_property().ok_or_else(|| {
841            RepositoryError::Runtime(RuntimeError::Graph(format!(
842                "entity {} has no id property for graph remove",
843                node.entity
844            )))
845        })?;
846        let id = node
847            .values
848            .get(&id_property.name)
849            .filter(|value| !is_unassigned_id_value(value))
850            .cloned()
851            .ok_or_else(|| {
852                RepositoryError::Runtime(RuntimeError::Graph(format!(
853                    "remove node {} missing id property {}",
854                    node.entity, id_property.name
855                )))
856            })?;
857        let current = self
858            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
859            .ok_or_else(|| {
860                RepositoryError::Runtime(RuntimeError::Graph(format!(
861                    "remove node {}({}) does not exist",
862                    node.entity,
863                    graph_identity_key(&id)
864                )))
865            })?;
866        if let Some(version_property) = descriptor.version_property() {
867            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
868                if *existing_version < 0 {
869                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
870                        "remove node {}({}) is already deleted",
871                        node.entity,
872                        graph_identity_key(&id)
873                    ))));
874                }
875            }
876        }
877        Ok(())
878    }
879
880    fn graph_node_from_record(
881        &self,
882        entity: &str,
883        record: Record,
884    ) -> Result<GraphNode, RuntimeError> {
885        let descriptor = self.repository.metadata.context.require_entity(entity)?;
886        let mut node = GraphNode::new(entity);
887
888        for (field, value) in record {
889            if field == "_comment" {
890                if let Value::Text(comment) = value {
891                    node.set_comment(comment);
892                }
893                continue;
894            }
895            let Some(relation) = descriptor.relation_by_name(&field) else {
896                node.values.insert(field, value);
897                continue;
898            };
899
900            match value {
901                Value::Null => {
902                    node.relations.entry(field).or_default();
903                }
904                Value::Object(record) => {
905                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
906                    node.relations.entry(field).or_default().push(child);
907                }
908                Value::List(values) => {
909                    let children = node.relations.entry(field.clone()).or_default();
910                    for value in values {
911                        let Value::Object(record) = value else {
912                            return Err(RuntimeError::Graph(format!(
913                                "relation {}.{} expects object children, got {:?}",
914                                entity, field, value
915                            )));
916                        };
917                        children
918                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
919                    }
920                }
921                other => {
922                    return Err(RuntimeError::Graph(format!(
923                        "relation {}.{} expects object/list/null, got {:?}",
924                        entity, field, other
925                    )));
926                }
927            }
928        }
929
930        Ok(node)
931    }
932
933    fn graph_update_command(
934        &self,
935        node: &mut GraphNode,
936        descriptor: &EntityDescriptor,
937        id_property: &PropertyDescriptor,
938        id: &Value,
939    ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
940        crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
941        let check_result = self
942            .repository
943            .metadata
944            .context
945            .check_and_fix_record(&node.entity, &mut node.values);
946        crate::clear_record_status(&mut node.values);
947        check_result.map_err(RepositoryError::Runtime)?;
948
949        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
950        command.old_values = node.original_values.clone();
951        if let Some(version_property) = descriptor.version_property() {
952            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
953                command = command.expected_version(*version);
954            }
955        }
956        // Filter properties by dirty_fields when available (Java-style minimal UPDATE).
957        // When dirty_fields is Some, only modified fields are included in the SET clause.
958        // When dirty_fields is None (no tracking), fall back to all fields in node.values.
959        for property in descriptor.properties.iter().filter(|property| {
960            !property.is_id
961                && !property.is_version
962                && property.name != id_property.name
963                && match &node.dirty_fields {
964                    Some(dirty) => dirty.contains(&property.name),
965                    None => node.values.contains_key(&property.name),
966                }
967        }) {
968            if let Some(value) = node.values.get(&property.name) {
969                command.values.insert(property.name.clone(), value.clone());
970            }
971        }
972        Ok(command)
973    }
974
975    fn delete_graph_node<'b, 's: 'b>(
976        &'b self,
977        node: &'b GraphNode,
978        parent_scope: Option<&'s ScopedCommentNode<'s>>,
979    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
980        Box::pin(async move {
981        let descriptor = self
982            .repository
983            .metadata
984            .context
985            .require_entity(&node.entity)
986            .map_err(RepositoryError::Runtime)?;
987        let id_property = descriptor.id_property().ok_or_else(|| {
988            RepositoryError::Runtime(RuntimeError::Graph(format!(
989                "entity {} has no id property for graph remove",
990                node.entity
991            )))
992        })?;
993        let id = node
994            .values
995            .get(&id_property.name)
996            .filter(|value| !is_unassigned_id_value(value))
997            .cloned()
998            .ok_or_else(|| {
999                RepositoryError::Runtime(RuntimeError::Graph(format!(
1000                    "remove node {} missing id property {}",
1001                    node.entity, id_property.name
1002                )))
1003            })?;
1004        let mut delete = DeleteCommand::new(node.entity.clone(), id);
1005        if let Some(version_property) = descriptor.version_property() {
1006            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1007                delete = delete.expected_version(*version);
1008            }
1009        }
1010
1011        // Create scope node for deletion if parent/node comment is present
1012        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1013            parent: parent_scope,
1014            track: teaql_core::TraceNode {
1015                entity_type: node.entity.clone(),
1016                entity_id: node
1017                    .id()
1018                    .and_then(|v| match v {
1019                        Value::U64(n) => Some(*n),
1020                        Value::I64(n) => Some(*n as u64),
1021                        _ => None,
1022                    }),
1023                comment: c.clone(),
1024            },
1025        });
1026        let active_scope = current_scope.as_ref().or(parent_scope);
1027        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1028
1029        self.delete_scoped(&delete, lineage).await
1030        })
1031    }
1032
1033    pub async fn fetch_graph_current_row(
1034        &self,
1035        entity: &str,
1036        id_property: &str,
1037        id: &Value,
1038        trace_chain: Vec<teaql_core::TraceNode>,
1039    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1040        let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1041        query.trace_chain = trace_chain;
1042        let mut rows = self
1043            .scoped_repository(entity.to_owned())
1044            .fetch_all(&query).await?;
1045        Ok(rows.pop())
1046    }
1047
1048    async fn fetch_graph_children(
1049        &self,
1050        entity: &str,
1051        foreign_key: &str,
1052        parent_value: &Value,
1053        trace_chain: Vec<teaql_core::TraceNode>,
1054    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1055        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1056        query.trace_chain = trace_chain;
1057        self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1058    }
1059}