Skip to main content

teaql_runtime/repository/
graph.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5    DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6    SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10    GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11    RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21    pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22        if node.entity != self.entity {
23            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24                "resolved repository {} cannot save graph root {}",
25                self.entity, node.entity
26            ))));
27        }
28        let plan = self.plan_graph(node).await?;
29        self.execute_graph_plan(plan).await
30    }
31
32    pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
33        fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
34            let mut relations = BTreeMap::new();
35            for (rel_name, child) in node.children {
36                relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
37            }
38            GraphNode {
39                entity: node.entity_type,
40                values: node.record,
41                relations,
42                operation: match node.operation {
43                    teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
44                    teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
45                },
46                comment: node.comment,
47                dirty_fields: None, original_values: None,
48            }
49        }
50        self.save_graph(convert(graph.root)).await
51    }
52
53    pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
54    where
55        T: Entity,
56    {
57        let node = self
58            .graph_node_from_entity(entity)
59            .map_err(RepositoryError::Runtime)?;
60        self.save_graph(node).await
61    }
62
63    pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
64    where
65        T: Entity,
66    {
67        if !status.need_persist() {
68            return Ok(GraphNode::new(&self.entity));
69        }
70        if status.is_deleted() {
71            let mut node = self.graph_node_from_entity(entity)
72                .map_err(RepositoryError::Runtime)?;
73            node.operation = GraphOperation::Remove;
74            node.relations.clear();
75            self.save_graph(node).await
76        } else {
77            self.save_entity_graph(entity).await
78        }
79    }
80    pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
81    where
82        T: Entity,
83    {
84        if status.is_deleted() {
85            let mut node = self.graph_node_from_entity(entity)
86                .map_err(RepositoryError::Runtime)?;
87            node.operation = GraphOperation::Remove;
88            node.relations.clear();
89            node.set_comment(comment);
90            self.save_graph(node).await
91        } else {
92            self.save_entity_graph_with_comment(entity, comment).await
93        }
94    }
95    pub async fn save_entity_graph_with_comment<T>(
96        &self,
97        entity: T,
98        comment: impl Into<String>,
99    ) -> Result<GraphNode, RepositoryError<E::Error>>
100    where
101        T: Entity,
102    {
103        let mut node = self
104            .graph_node_from_entity(entity)
105            .map_err(RepositoryError::Runtime)?;
106        node.set_comment(comment);
107        self.save_graph(node).await
108    }
109
110    /// Create a new entity graph with an annotation comment on the root node.
111    /// This assumes all new nodes do not exist in the database, skipping existence checks
112    /// and throwing an exception on primary key conflict.
113    pub async fn create_entity_graph_with_comment<T>(
114        &self,
115        entity: T,
116        comment: impl Into<String>,
117    ) -> Result<GraphNode, RepositoryError<E::Error>>
118    where
119        T: Entity,
120    {
121        let mut node = self
122            .graph_node_from_entity(entity)
123            .map_err(RepositoryError::Runtime)?;
124        node.operation = GraphOperation::Create;
125        node.set_comment(comment);
126        self.save_graph(node).await
127    }
128
129    pub async fn plan_graph(
130        &self,
131        node: GraphNode,
132    ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
133        if node.entity != self.entity {
134            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
135                "resolved repository {} cannot plan graph root {}",
136                self.entity, node.entity
137            ))));
138        }
139        let mut node = node;
140        let mut plan = GraphMutationPlan::default();
141        self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
142        plan.planned_root = Some(node);
143        plan.rebuild_batches();
144        Ok(plan)
145    }
146
147    pub async fn execute_graph_plan(
148        &self,
149        plan: GraphMutationPlan,
150    ) -> Result<GraphNode, RepositoryError<E::Error>> {
151        let Some(root) = plan.planned_root else {
152            return Err(RepositoryError::Runtime(RuntimeError::Graph(
153                "graph mutation plan has no planned root".to_owned(),
154            )));
155        };
156
157        for batch in plan.batches {
158            if batch.items.is_empty() || (matches!(batch.kind, GraphMutationKind::Update) && batch.update_fields.is_empty()) {
159                continue;
160            }
161            match batch.kind {
162                GraphMutationKind::Create => {
163                    let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
164                    for item in batch.items {
165                        cmd.batch_values.push(item.values);
166                        if let Some(token) = item.scope_token {
167                            cmd.trace_chains.push(token.recover_trace_chain());
168                        } else {
169                            cmd.trace_chains.push(Vec::new());
170                        }
171                    }
172                    self.execute_prepared_batch_insert(cmd).await?;
173                }
174                GraphMutationKind::Update => {
175                    if batch.update_fields.is_empty() {
176                        continue;
177                    }
178                    let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
179                    for item in batch.items {
180                        let id = item.values.get("id").cloned().ok_or_else(|| {
181                            RepositoryError::Runtime(RuntimeError::Graph(format!(
182                                "update item in batch missing id for {}", batch.entity
183                            )))
184                        })?;
185                        let version = item.values.get("version").and_then(|v| {
186                            if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
187                        });
188                        cmd.batch_values.push(item.values);
189                        cmd.batch_ids.push(id);
190                        cmd.batch_expected_versions.push(version);
191                        cmd.batch_old_values.push(item.old_values);
192                        if let Some(token) = item.scope_token {
193                            cmd.trace_chains.push(token.recover_trace_chain());
194                        } else {
195                            cmd.trace_chains.push(Vec::new());
196                        }
197                    }
198                    self.execute_prepared_batch_update(cmd).await?;
199                }
200                GraphMutationKind::Delete => {
201                    // For now, loop individually since we lack BatchDeleteCommand
202                    for item in batch.items {
203                        let id = item.values.get("id").cloned().ok_or_else(|| {
204                            RepositoryError::Runtime(RuntimeError::Graph(format!(
205                                "delete item in batch missing id for {}", batch.entity
206                            )))
207                        })?;
208                        let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
209                        if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
210                            cmd = cmd.expected_version(*version);
211                        }
212                        let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
213                        self.delete_scoped(&cmd, trace_chain).await?;
214                    }
215                }
216                GraphMutationKind::Reference => {
217                    // References are skipped in execution, they only validate during traversal
218                }
219            }
220        }
221
222        Ok(root)
223    }
224
225    pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
226    where
227        T: Entity,
228    {
229        let descriptor = T::entity_descriptor();
230        if descriptor.name != self.entity {
231            return Err(RuntimeError::Graph(format!(
232                "resolved repository {} cannot extract graph root {}",
233                self.entity, descriptor.name
234            )));
235        }
236        // Extract dirty field names BEFORE into_record() consumes the entity.
237        // This is the Rust equivalent of Java's entity.getUpdatedProperties().
238        let dirty_fields = entity.dirty_fields();
239        let original_values = entity.original_values();
240        let is_deleted = entity.is_marked_as_delete();
241        let comment = entity.get_comment();
242        let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
243        node.dirty_fields = dirty_fields;
244        node.original_values = original_values;
245        if is_deleted {
246            node.operation = GraphOperation::Remove;
247            node.relations.clear();
248        }
249        if let Some(c) = comment {
250            node.set_comment(c);
251        }
252        Ok(node)
253    }
254
255    fn collect_graph_plan<'b, 's: 'b>(
256        &'b self,
257        node: &'b mut GraphNode,
258        plan: &'b mut GraphMutationPlan,
259        parent_scope: Option<&'s ScopedCommentNode<'s>>,
260        parent_token: Option<Arc<TraceScopeToken>>,
261        parent_is_create: bool,
262    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
263        Box::pin(async move {
264        match node.operation {
265            GraphOperation::Reference => {
266                plan.push(
267                    node.entity.clone(),
268                    GraphMutationKind::Reference,
269                    node.values.clone(),
270                    Vec::new(),
271                    parent_token,
272                    node.original_values.clone(),
273                );
274                return Ok(());
275            }
276            GraphOperation::Remove => {
277                plan.push(
278                    node.entity.clone(),
279                    GraphMutationKind::Delete,
280                    node.values.clone(),
281                    Vec::new(),
282                    parent_token,
283                    node.original_values.clone(),
284                );
285                return Ok(());
286            }
287            GraphOperation::Upsert | GraphOperation::Create => {}
288        }
289
290        let descriptor = self
291            .repository
292            .metadata
293            .context
294            .require_entity(&node.entity)
295            .map_err(RepositoryError::Runtime)?;
296
297        // Create scope node on the current stack frame if this node has a comment
298        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
299            parent: parent_scope,
300            track: teaql_core::TraceNode {
301                entity_type: node.entity.clone(),
302                entity_id: node.id().and_then(|v| match v {
303                    Value::U64(n) => Some(*n),
304                    Value::I64(n) => Some(*n as u64),
305                    _ => None,
306                }),
307                comment: c.clone(),
308            },
309        });
310        let active_scope = current_scope.as_ref().or(parent_scope);
311
312        let id_property = descriptor.id_property().cloned();
313        let id = id_property.as_ref().and_then(|property| {
314            node.values
315                .get(&property.name)
316                .filter(|value| !is_unassigned_id_value(value))
317                .cloned()
318        });
319
320        if let Some(id_val) = &id {
321            if !plan.visited_nodes.insert((node.entity.clone(), graph_identity_key(id_val))) {
322                return Ok(());
323            }
324        }
325
326        let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
327
328        let is_update = if is_create_op {
329            false
330        } else {
331            match (id_property.as_ref(), id.as_ref()) {
332                (Some(id_property), Some(id)) => self
333                    .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
334                    .is_some(),
335                _ => false,
336            }
337        };
338        if !is_update {
339            if let Some(id_property) = id_property.as_ref() {
340                let needs_id = !node.values.contains_key(&id_property.name)
341                    || node
342                        .values
343                        .get(&id_property.name)
344                        .is_some_and(is_unassigned_id_value);
345                if needs_id {
346                    let id = self
347                        .repository
348                        .metadata
349                        .context
350                        .next_id(&node.entity)
351                        .map_err(RepositoryError::Runtime)?;
352                    node.values.insert(id_property.name.clone(), Value::U64(id));
353                }
354            }
355            ensure_initial_version(&mut node.values, descriptor);
356        }
357        let update_fields = if is_update {
358            let mut excluded = Vec::new();
359            if let Some(id_property) = id_property.as_ref() {
360                excluded.push(id_property.name.clone());
361            }
362            if let Some(version_property) = descriptor.version_property() {
363                excluded.push(version_property.name.clone());
364            }
365            let mut fields = sorted_update_fields(&node.values, excluded);
366            if let Some(dirty) = &node.dirty_fields {
367                fields.retain(|f| dirty.contains(f));
368            }
369            fields
370        } else {
371            Vec::new()
372        };
373
374        // Build the TraceScopeToken for this node (only if it has a comment).
375        // This is an Arc-linked persistent list: zero-copy, O(1) creation.
376        let current_token = if let Some(c) = &node.comment {
377            Some(Arc::new(TraceScopeToken {
378                parent: parent_token.clone(),
379                track: teaql_core::TraceNode {
380                    entity_type: node.entity.clone(),
381                    entity_id: node.id().and_then(|v| match v {
382                        Value::U64(n) => Some(*n),
383                        Value::I64(n) => Some(*n as u64),
384                        _ => None,
385                    }),
386                    comment: c.clone(),
387                },
388                node_index: plan.next_item_index,
389            }))
390        } else {
391            parent_token.clone()
392        };
393
394        plan.push(
395            node.entity.clone(),
396            if is_update {
397                GraphMutationKind::Update
398            } else {
399                GraphMutationKind::Create
400            },
401            node.values.clone(),
402            update_fields,
403            current_token.clone(),
404            node.original_values.clone(),
405        );
406
407        for (name, children) in &mut node.relations {
408            let relation = descriptor.relation_by_name(name).ok_or_else(|| {
409                RepositoryError::Runtime(RuntimeError::MissingRelation {
410                    entity: node.entity.clone(),
411                    relation: name.clone(),
412                })
413            })?;
414            let child_repo = self.scoped_repository(relation.target_entity.clone());
415            for child in children {
416                ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
417                child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
418            }
419        }
420        Ok(())
421        })
422    }
423
424    fn insert_graph_node_scoped<'b, 's: 'b>(
425        &'b self,
426        mut node: GraphNode,
427        parent_scope: Option<&'s ScopedCommentNode<'s>>,
428    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
429        Box::pin(async move {
430        match node.operation {
431            GraphOperation::Upsert | GraphOperation::Create => {}
432            GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
433            GraphOperation::Remove => {
434                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
435                    "create graph cannot remove node {}",
436                    node.entity
437                ))));
438            }
439        }
440
441        // Create scope node on the current stack frame if this node has a comment
442        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
443            parent: parent_scope,
444            track: teaql_core::TraceNode {
445                entity_type: node.entity.clone(),
446                entity_id: node
447                    .id()
448                    .and_then(|v| match v {
449                        Value::U64(n) => Some(*n),
450                        Value::I64(n) => Some(*n as u64),
451                        _ => None,
452                    }),
453                comment: c.clone(),
454            },
455        });
456        let active_scope = current_scope.as_ref().or(parent_scope);
457
458        let descriptor = self
459            .repository
460            .metadata
461            .context
462            .require_entity(&node.entity)
463            .map_err(RepositoryError::Runtime)?;
464
465        let mut one_relations = Vec::new();
466        let mut many_relations = Vec::new();
467        for (name, children) in std::mem::take(&mut node.relations) {
468            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
469                RepositoryError::Runtime(RuntimeError::MissingRelation {
470                    entity: node.entity.clone(),
471                    relation: name.clone(),
472                })
473            })?;
474            if relation.many {
475                many_relations.push((name, relation.clone(), children));
476            } else {
477                one_relations.push((name, relation.clone(), children));
478            }
479        }
480
481        for (name, relation, children) in one_relations {
482            if children.len() > 1 {
483                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
484                    "relation {}.{} expects one child, got {}",
485                    node.entity,
486                    name,
487                    children.len()
488                ))));
489            }
490            let mut saved_children = Vec::new();
491            for child in children {
492                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
493                let child_repo = self.scoped_repository(child.entity.clone());
494                let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
495                if relation.attach {
496                    let foreign_value = saved_child
497                        .values
498                        .get(&relation.foreign_key)
499                        .cloned()
500                        .ok_or_else(|| {
501                            RepositoryError::Runtime(RuntimeError::Graph(format!(
502                                "saved child {} missing foreign key {} for relation {}.{}",
503                                relation.target_entity, relation.foreign_key, node.entity, name
504                            )))
505                        })?;
506                    node.values
507                        .insert(relation.local_key.clone(), foreign_value);
508                }
509                saved_children.push(saved_child);
510            }
511            node.relations.insert(name, saved_children);
512        }
513
514        let command = self
515            .prepare_insert_command(&InsertCommand {
516                entity: node.entity.clone(),
517                values: node.values.clone(),
518                trace_chain: Vec::new(),
519            })
520            .map_err(RepositoryError::Runtime)?;
521        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
522        self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
523        node.values = command.values;
524
525        for (name, relation, children) in many_relations {
526            let local_value = node
527                .values
528                .get(&relation.local_key)
529                .cloned()
530                .ok_or_else(|| {
531                    RepositoryError::Runtime(RuntimeError::Graph(format!(
532                        "parent {} missing local key {} for relation {}",
533                        node.entity, relation.local_key, name
534                    )))
535                })?;
536            let mut saved_children = Vec::new();
537            for mut child in children {
538                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
539                if relation.attach {
540                    child
541                        .values
542                        .insert(relation.foreign_key.clone(), local_value.clone());
543                }
544                let child_repo = self.scoped_repository(child.entity.clone());
545                saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
546            }
547            node.relations.insert(name, saved_children);
548        }
549
550        Ok(node)
551        })
552    }
553
554    fn upsert_graph_node_scoped<'b, 's: 'b>(
555        &'b self,
556        mut node: GraphNode,
557        parent_scope: Option<&'s ScopedCommentNode<'s>>,
558    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
559        Box::pin(async move {
560        // Create scope node on the current stack frame if this node has a comment
561        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
562            parent: parent_scope,
563            track: teaql_core::TraceNode {
564                entity_type: node.entity.clone(),
565                entity_id: node
566                    .id()
567                    .and_then(|v| match v {
568                        Value::U64(n) => Some(*n),
569                        Value::I64(n) => Some(*n as u64),
570                        _ => None,
571                    }),
572                comment: c.clone(),
573            },
574        });
575        let active_scope = current_scope.as_ref().or(parent_scope);
576
577        match node.operation {
578            GraphOperation::Upsert | GraphOperation::Create => {}
579            GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
580            GraphOperation::Remove => {
581                self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
582                self.delete_graph_node(&node, parent_scope).await?;
583                return Ok(node);
584            }
585        }
586
587        let descriptor = self
588            .repository
589            .metadata
590            .context
591            .require_entity(&node.entity)
592            .map_err(RepositoryError::Runtime)?;
593        let Some(id_property) = descriptor.id_property() else {
594            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
595                "entity {} has no id property for graph upsert",
596                node.entity
597            ))));
598        };
599        let Some(id) = node
600            .values
601            .get(&id_property.name)
602            .filter(|value| !is_unassigned_id_value(value))
603            .cloned()
604        else {
605            // Strip comment to prevent duplicate scope — already captured in active_scope
606            node.comment = None;
607            return self.insert_graph_node_scoped(node, active_scope).await;
608        };
609
610        if node.operation == GraphOperation::Create || self
611            .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
612            .is_none()
613        {
614            node.comment = None;
615            return self.insert_graph_node_scoped(node, active_scope).await;
616        }
617
618        let mut one_relations = Vec::new();
619        let mut many_relations = Vec::new();
620        for (name, children) in std::mem::take(&mut node.relations) {
621            let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
622                RepositoryError::Runtime(RuntimeError::MissingRelation {
623                    entity: node.entity.clone(),
624                    relation: name.clone(),
625                })
626            })?;
627            if relation.many {
628                many_relations.push((name, relation.clone(), children));
629            } else {
630                one_relations.push((name, relation.clone(), children));
631            }
632        }
633
634        for (name, relation, children) in one_relations {
635            if children.len() > 1 {
636                return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
637                    "relation {}.{} expects one child, got {}",
638                    node.entity,
639                    name,
640                    children.len()
641                ))));
642            }
643            let mut saved_children = Vec::new();
644            for child in children {
645                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
646                let child_repo = self.scoped_repository(child.entity.clone());
647                let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
648                if relation.attach {
649                    let foreign_value = saved_child
650                        .values
651                        .get(&relation.foreign_key)
652                        .cloned()
653                        .ok_or_else(|| {
654                            RepositoryError::Runtime(RuntimeError::Graph(format!(
655                                "saved child {} missing foreign key {} for relation {}.{}",
656                                relation.target_entity, relation.foreign_key, node.entity, name
657                            )))
658                        })?;
659                    node.values
660                        .insert(relation.local_key.clone(), foreign_value);
661                }
662                saved_children.push(saved_child);
663            }
664            node.relations.insert(name, saved_children);
665        }
666
667        let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
668        if !update.values.is_empty() {
669            let prepared_update = self
670                .prepare_update_command(&update)
671                .map_err(RepositoryError::Runtime)?;
672            let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
673            self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
674            for (field, value) in &prepared_update.values {
675                node.values.insert(field.clone(), value.clone());
676            }
677            if let Some(version_property) = descriptor.version_property() {
678                if let Some(expected_version) = prepared_update.expected_version {
679                    node.values.insert(
680                        version_property.name.clone(),
681                        Value::I64(expected_version + 1),
682                    );
683                }
684            }
685        }
686
687        for (name, relation, children) in many_relations {
688            let local_value = node
689                .values
690                .get(&relation.local_key)
691                .cloned()
692                .ok_or_else(|| {
693                    RepositoryError::Runtime(RuntimeError::Graph(format!(
694                        "parent {} missing local key {} for relation {}",
695                        node.entity, relation.local_key, name
696                    )))
697                })?;
698            let child_repo = self.scoped_repository(relation.target_entity.clone());
699            let child_descriptor = self
700                .repository
701                .metadata
702                .context
703                .require_entity(&relation.target_entity)
704                .map_err(RepositoryError::Runtime)?;
705            let child_id_property = child_descriptor.id_property().ok_or_else(|| {
706                RepositoryError::Runtime(RuntimeError::Graph(format!(
707                    "entity {} has no id property",
708                    relation.target_entity
709                )))
710            })?;
711
712            let mut seen = std::collections::BTreeSet::new();
713            let mut saved_children = Vec::new();
714            for mut child in children {
715                ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
716                if relation.attach && child.operation != GraphOperation::Reference {
717                    child
718                        .values
719                        .insert(relation.foreign_key.clone(), local_value.clone());
720                }
721                if let Some(child_id) = child
722                    .values
723                    .get(&child_id_property.name)
724                    .filter(|value| !is_unassigned_id_value(value))
725                {
726                    let key = graph_identity_key(child_id);
727                    if !seen.insert(key.clone()) {
728                        return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
729                            "duplicate child id {key} in relation {}.{}",
730                            node.entity, name
731                        ))));
732                    }
733                }
734                saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
735            }
736
737
738
739            node.relations.insert(name, saved_children);
740        }
741
742        Ok(node)
743        })
744    }
745
746    async fn validate_reference_node(
747        &self,
748        node: GraphNode,
749        trace_chain: Vec<teaql_core::TraceNode>,
750    ) -> Result<GraphNode, RepositoryError<E::Error>> {
751        if !node.relations.is_empty() {
752            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
753                "reference node {} cannot contain child relations",
754                node.entity
755            ))));
756        }
757        let descriptor = self
758            .repository
759            .metadata
760            .context
761            .require_entity(&node.entity)
762            .map_err(RepositoryError::Runtime)?;
763        let id_property = descriptor.id_property().ok_or_else(|| {
764            RepositoryError::Runtime(RuntimeError::Graph(format!(
765                "entity {} has no id property for graph reference",
766                node.entity
767            )))
768        })?;
769        let id = node
770            .values
771            .get(&id_property.name)
772            .filter(|value| !is_unassigned_id_value(value))
773            .cloned()
774            .ok_or_else(|| {
775                RepositoryError::Runtime(RuntimeError::Graph(format!(
776                    "reference node {} missing id property {}",
777                    node.entity, id_property.name
778                )))
779            })?;
780
781        for field in node.values.keys() {
782            if field == &id_property.name {
783                continue;
784            }
785            if descriptor
786                .version_property()
787                .map(|property| field == &property.name)
788                .unwrap_or(false)
789            {
790                continue;
791            }
792            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
793                "reference node {} cannot carry mutable field {}",
794                node.entity, field
795            ))));
796        }
797
798        let current = self
799            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
800            .ok_or_else(|| {
801                RepositoryError::Runtime(RuntimeError::Graph(format!(
802                    "reference node {}({}) does not exist",
803                    node.entity,
804                    graph_identity_key(&id)
805                )))
806            })?;
807
808        if let Some(version_property) = descriptor.version_property() {
809            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
810                if *existing_version < 0 {
811                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
812                        "reference node {}({}) is deleted",
813                        node.entity,
814                        graph_identity_key(&id)
815                    ))));
816                }
817                if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
818                {
819                    if expected_version != existing_version {
820                        println!("OptimisticLockConflict in validate_reference_node! entity={}, expected={}, existing={}", node.entity, expected_version, existing_version);
821                        return Err(RepositoryError::Runtime(
822                            RuntimeError::OptimisticLockConflict {
823                                entity: node.entity,
824                                id: graph_identity_key(&id),
825                            },
826                        ));
827                    }
828                }
829            }
830        }
831
832        Ok(GraphNode {
833            entity: node.entity,
834            values: current,
835            relations: BTreeMap::new(),
836            operation: GraphOperation::Reference,
837            comment: None,
838            dirty_fields: None, original_values: None,
839        })
840    }
841
842    async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
843        if !node.relations.is_empty() {
844            return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
845                "remove node {} cannot contain child relations",
846                node.entity
847            ))));
848        }
849        let descriptor = self
850            .repository
851            .metadata
852            .context
853            .require_entity(&node.entity)
854            .map_err(RepositoryError::Runtime)?;
855        let id_property = descriptor.id_property().ok_or_else(|| {
856            RepositoryError::Runtime(RuntimeError::Graph(format!(
857                "entity {} has no id property for graph remove",
858                node.entity
859            )))
860        })?;
861        let id = node
862            .values
863            .get(&id_property.name)
864            .filter(|value| !is_unassigned_id_value(value))
865            .cloned()
866            .ok_or_else(|| {
867                RepositoryError::Runtime(RuntimeError::Graph(format!(
868                    "remove node {} missing id property {}",
869                    node.entity, id_property.name
870                )))
871            })?;
872        let current = self
873            .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
874            .ok_or_else(|| {
875                RepositoryError::Runtime(RuntimeError::Graph(format!(
876                    "remove node {}({}) does not exist",
877                    node.entity,
878                    graph_identity_key(&id)
879                )))
880            })?;
881        if let Some(version_property) = descriptor.version_property() {
882            if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
883                if *existing_version < 0 {
884                    return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
885                        "remove node {}({}) is already deleted",
886                        node.entity,
887                        graph_identity_key(&id)
888                    ))));
889                }
890            }
891        }
892        Ok(())
893    }
894
895    fn graph_node_from_record(
896        &self,
897        entity: &str,
898        record: Record,
899    ) -> Result<GraphNode, RuntimeError> {
900        let descriptor = self.repository.metadata.context.require_entity(entity)?;
901        let mut node = GraphNode::new(entity);
902
903        for (field, value) in record {
904            if field == "_comment" {
905                if let Value::Text(comment) = value {
906                    node.set_comment(comment);
907                }
908                continue;
909            }
910            if field == "_dirty_fields" {
911                if let Value::List(fields) = value {
912                    let mut dirty = std::collections::BTreeSet::new();
913                    for f in fields {
914                        if let Value::Text(t) = f {
915                            dirty.insert(t);
916                        }
917                    }
918                    node.dirty_fields = Some(dirty);
919                }
920                continue;
921            }
922            if field == "_original_values" {
923                if let Value::Object(orig) = value {
924                    node.original_values = Some(orig);
925                }
926                continue;
927            }
928            let Some(relation) = descriptor.relation_by_name(&field) else {
929                node.values.insert(field, value);
930                continue;
931            };
932
933            match value {
934                Value::Null => {
935                    node.relations.entry(field).or_default();
936                }
937                Value::Object(record) => {
938                    let child = self.graph_node_from_record(&relation.target_entity, record)?;
939                    node.relations.entry(field).or_default().push(child);
940                }
941                Value::List(values) => {
942                    let children = node.relations.entry(field.clone()).or_default();
943                    for value in values {
944                        let Value::Object(record) = value else {
945                            return Err(RuntimeError::Graph(format!(
946                                "relation {}.{} expects object children, got {:?}",
947                                entity, field, value
948                            )));
949                        };
950                        children
951                            .push(self.graph_node_from_record(&relation.target_entity, record)?);
952                    }
953                }
954                other => {
955                    return Err(RuntimeError::Graph(format!(
956                        "relation {}.{} expects object/list/null, got {:?}",
957                        entity, field, other
958                    )));
959                }
960            }
961        }
962
963        Ok(node)
964    }
965
966    fn graph_update_command(
967        &self,
968        node: &mut GraphNode,
969        descriptor: &EntityDescriptor,
970        id_property: &PropertyDescriptor,
971        id: &Value,
972    ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
973        crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
974        let check_result = self
975            .repository
976            .metadata
977            .context
978            .check_and_fix_record(&node.entity, &mut node.values);
979        crate::clear_record_status(&mut node.values);
980        check_result.map_err(RepositoryError::Runtime)?;
981
982        let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
983        command.old_values = node.original_values.clone();
984        if let Some(version_property) = descriptor.version_property() {
985            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
986                command = command.expected_version(*version);
987            }
988        }
989        // Filter properties by dirty_fields when available (Java-style minimal UPDATE).
990        // When dirty_fields is Some, only modified fields are included in the SET clause.
991        // When dirty_fields is None (no tracking), fall back to all fields in node.values.
992        for property in descriptor.properties.iter().filter(|property| {
993            !property.is_id
994                && !property.is_version
995                && property.name != id_property.name
996                && match &node.dirty_fields {
997                    Some(dirty) => dirty.contains(&property.name),
998                    None => node.values.contains_key(&property.name),
999                }
1000        }) {
1001            if let Some(value) = node.values.get(&property.name) {
1002                command.values.insert(property.name.clone(), value.clone());
1003            }
1004        }
1005        Ok(command)
1006    }
1007
1008    fn delete_graph_node<'b, 's: 'b>(
1009        &'b self,
1010        node: &'b GraphNode,
1011        parent_scope: Option<&'s ScopedCommentNode<'s>>,
1012    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
1013        Box::pin(async move {
1014        let descriptor = self
1015            .repository
1016            .metadata
1017            .context
1018            .require_entity(&node.entity)
1019            .map_err(RepositoryError::Runtime)?;
1020        let id_property = descriptor.id_property().ok_or_else(|| {
1021            RepositoryError::Runtime(RuntimeError::Graph(format!(
1022                "entity {} has no id property for graph remove",
1023                node.entity
1024            )))
1025        })?;
1026        let id = node
1027            .values
1028            .get(&id_property.name)
1029            .filter(|value| !is_unassigned_id_value(value))
1030            .cloned()
1031            .ok_or_else(|| {
1032                RepositoryError::Runtime(RuntimeError::Graph(format!(
1033                    "remove node {} missing id property {}",
1034                    node.entity, id_property.name
1035                )))
1036            })?;
1037        let mut delete = DeleteCommand::new(node.entity.clone(), id);
1038        if let Some(version_property) = descriptor.version_property() {
1039            if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1040                delete = delete.expected_version(*version);
1041            }
1042        }
1043
1044        // Create scope node for deletion if parent/node comment is present
1045        let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1046            parent: parent_scope,
1047            track: teaql_core::TraceNode {
1048                entity_type: node.entity.clone(),
1049                entity_id: node
1050                    .id()
1051                    .and_then(|v| match v {
1052                        Value::U64(n) => Some(*n),
1053                        Value::I64(n) => Some(*n as u64),
1054                        _ => None,
1055                    }),
1056                comment: c.clone(),
1057            },
1058        });
1059        let active_scope = current_scope.as_ref().or(parent_scope);
1060        let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1061
1062        self.delete_scoped(&delete, lineage).await
1063        })
1064    }
1065
1066
1067    async fn fetch_graph_children(
1068        &self,
1069        entity: &str,
1070        foreign_key: &str,
1071        parent_value: &Value,
1072        trace_chain: Vec<teaql_core::TraceNode>,
1073    ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1074        let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1075        query.trace_chain = trace_chain;
1076        self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1077    }
1078    pub async fn fetch_graph_current_row(
1079        &self,
1080        entity: &str,
1081        id_property: &str,
1082        id: &teaql_core::Value,
1083        trace_chain: Vec<teaql_core::TraceNode>,
1084    ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1085        let mut query = teaql_core::SelectQuery::new(entity).filter(teaql_core::Expr::eq(id_property, id.clone()));
1086        query.trace_chain = trace_chain;
1087        let mut rows = self
1088            .scoped_repository(entity.to_owned())
1089            .fetch_all(&query).await?;
1090        Ok(rows.pop())
1091    }
1092
1093    pub async fn execute_ledger_plan(&self, root: crate::EntityRoot) -> Result<(), RepositoryError<E::Error>> {
1094        let comment = root.get_comment();
1095        let trace_chain = comment.map(|c| vec![teaql_core::TraceNode {
1096            entity_type: self.entity.clone(),
1097            entity_id: None,
1098            comment: c,
1099        }]).unwrap_or_default();
1100
1101        let deleted_keys = root.deleted_keys();
1102        let new_keys = root.new_keys();
1103        let change_set = root.current_change_set();
1104        
1105        // 1. Execute Deletes
1106        for key in deleted_keys.iter() {
1107            let id = key.id.clone();
1108            let mut cmd = teaql_core::DeleteCommand::new(&key.entity, id);
1109            if let Some(version) = root.get_original_version(key) {
1110                cmd = cmd.expected_version(version);
1111            }
1112            let t = root.get_trace_chain(key);
1113            cmd.trace_chain = if t.is_empty() { trace_chain.clone() } else { t };
1114            self.delete(&cmd).await?;
1115        }
1116
1117        // 2. Execute Updates and Inserts
1118        let mut update_batches: std::collections::BTreeMap<(String, String), Vec<crate::EntityKey>> = std::collections::BTreeMap::new();
1119        let mut insert_batches: std::collections::BTreeMap<String, Vec<crate::EntityKey>> = std::collections::BTreeMap::new();
1120
1121        for (key, record) in change_set.changes() {
1122            if deleted_keys.contains(key) {
1123                continue;
1124            }
1125            let mut is_new = new_keys.contains(key);
1126            
1127            if !is_new {
1128                let descriptor = self.repository.metadata.context.require_entity(&key.entity).map_err(RepositoryError::Runtime)?;
1129                let id_property = descriptor.id_property().ok_or_else(|| {
1130                    RepositoryError::Runtime(RuntimeError::Graph(format!("entity {} has no id property", key.entity)))
1131                })?;
1132                let t = root.get_trace_chain(key);
1133                let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1134                let current_row = self.fetch_graph_current_row(&key.entity, &id_property.name, &key.id, my_trace).await?;
1135                if current_row.is_none() {
1136                    is_new = true;
1137                }
1138            }
1139            
1140            if is_new {
1141                insert_batches.entry(key.entity.clone()).or_default().push(key.clone());
1142            } else {
1143                let mut fields: Vec<String> = record.keys().cloned().collect();
1144                fields.sort();
1145                let signature = fields.join(",");
1146                update_batches.entry((key.entity.clone(), signature)).or_default().push(key.clone());
1147            }
1148        }
1149
1150        let mut insert_order: Vec<String> = insert_batches.keys().cloned().collect();
1151        insert_order.sort();
1152        println!("execute_ledger_plan: insert_batches={:?}", insert_order);
1153        
1154        for entity in insert_order {
1155            let keys = insert_batches.get(&entity).unwrap();
1156            let descriptor = self.repository.metadata.context.require_entity(&entity).map_err(RepositoryError::Runtime)?;
1157            let mut cmd = teaql_core::BatchInsertCommand::new(&descriptor.table_name);
1158            let mut traces = Vec::new();
1159            for key in keys {
1160                let record = change_set.changes().get(key).unwrap();
1161                let mut db_record = Record::new();
1162                db_record.insert("id".to_owned(), key.id.clone());
1163                for (field, value) in record {
1164                    db_record.insert(field.clone(), value.clone());
1165                }
1166                crate::repository::helpers::ensure_initial_version(&mut db_record, descriptor);
1167                cmd.batch_values.push(db_record);
1168                let t = root.get_trace_chain(key);
1169                let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1170                traces.push(my_trace);
1171            }
1172            cmd.trace_chains = traces;
1173            self.execute_prepared_batch_insert(cmd).await?;
1174        }
1175
1176        let mut update_order: Vec<(String, String)> = update_batches.keys().cloned().collect();
1177        update_order.sort();
1178        println!("execute_ledger_plan: update_batches={:?}", update_order);
1179
1180        for signature in update_order {
1181            let keys = update_batches.get(&signature).unwrap();
1182            let descriptor = self.repository.metadata.context.require_entity(&signature.0).map_err(RepositoryError::Runtime)?;
1183            let mut update_fields: Vec<String> = signature.0.split(',').map(|s| s.to_string()).collect();
1184            let mut cmd = teaql_core::BatchUpdateCommand::new(&descriptor.table_name, update_fields);
1185            let mut traces = Vec::new();
1186            for key in keys {
1187                let record = change_set.changes().get(key).unwrap();
1188                let mut db_record = Record::new();
1189                db_record.insert("id".to_owned(), key.id.clone());
1190                for (field, value) in record {
1191                    db_record.insert(field.clone(), value.clone());
1192                }
1193                crate::repository::helpers::increment_version(&mut db_record, descriptor, root.get_original_version(key));
1194                cmd.batch_values.push(db_record);
1195                let t = root.get_trace_chain(key);
1196                let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1197                traces.push(my_trace);
1198            }
1199            cmd.trace_chains = traces;
1200            self.execute_prepared_batch_update(cmd).await?;
1201        }
1202
1203        Ok(())
1204    }
1205}