1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6 SelectQuery, UpdateCommand, Value,
7};
8use teaql_sql::SqlDialect;
9
10use crate::{
11 GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
12 RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
13};
14use crate::entity_status::EntityStatus;
15
16use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
17
18impl<'a, D, E> ResolvedRepository<'a, D, E>
19where
20 D: SqlDialect,
21 E: QueryExecutor,
22{
23 pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
24 if node.entity != self.entity {
25 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
26 "resolved repository {} cannot save graph root {}",
27 self.entity, node.entity
28 ))));
29 }
30 let boundary = self
31 .repository
32 .executor
33 .begin_transaction()
34 .map_err(RepositoryError::Executor)?;
35 if matches!(boundary, GraphTransactionBoundary::Unsupported) {
36 return Err(RepositoryError::Runtime(RuntimeError::Graph(
37 "save_graph requires a transactional executor".to_owned(),
38 )));
39 }
40 let result = self.upsert_graph_node_scoped(node, None);
41 match result {
42 Ok(saved) => {
43 if matches!(boundary, GraphTransactionBoundary::Started) {
44 self.repository
45 .executor
46 .commit_transaction()
47 .map_err(RepositoryError::Executor)?;
48 }
49 Ok(saved)
50 }
51 Err(err) => {
52 if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
53 self.repository
54 .executor
55 .rollback_transaction()
56 .map_err(RepositoryError::Executor)?;
57 }
58 Err(err)
59 }
60 }
61 }
62
63 pub fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
64 fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
65 let mut relations = BTreeMap::new();
66 for (rel_name, child) in node.children {
67 relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
68 }
69 GraphNode {
70 entity: node.entity_type,
71 values: node.record,
72 relations,
73 operation: match node.operation {
74 teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
75 teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
76 },
77 comment: node.comment,
78 dirty_fields: None, original_values: None,
79 }
80 }
81 self.save_graph(convert(graph.root))
82 }
83
84 pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
85 where
86 T: Entity,
87 {
88 let node = self
89 .graph_node_from_entity(entity)
90 .map_err(RepositoryError::Runtime)?;
91 self.save_graph(node)
92 }
93
94 pub fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
95 where
96 T: Entity,
97 {
98 if !status.need_persist() {
99 return Ok(GraphNode::new(&self.entity));
100 }
101 if status.is_deleted() {
102 let mut node = self.graph_node_from_entity(entity)
103 .map_err(RepositoryError::Runtime)?;
104 node.operation = GraphOperation::Remove;
105 node.relations.clear();
106 self.save_graph(node)
107 } else {
108 self.save_entity_graph(entity)
109 }
110 }
111 pub fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
112 where
113 T: Entity,
114 {
115 if status.is_deleted() {
116 let mut node = self.graph_node_from_entity(entity)
117 .map_err(RepositoryError::Runtime)?;
118 node.operation = GraphOperation::Remove;
119 node.relations.clear();
120 node.set_comment(comment);
121 self.save_graph(node)
122 } else {
123 self.save_entity_graph_with_comment(entity, comment)
124 }
125 }
126 pub fn save_entity_graph_with_comment<T>(
127 &self,
128 entity: T,
129 comment: impl Into<String>,
130 ) -> Result<GraphNode, RepositoryError<E::Error>>
131 where
132 T: Entity,
133 {
134 let mut node = self
135 .graph_node_from_entity(entity)
136 .map_err(RepositoryError::Runtime)?;
137 node.set_comment(comment);
138 self.save_graph(node)
139 }
140
141 pub fn create_entity_graph_with_comment<T>(
145 &self,
146 entity: T,
147 comment: impl Into<String>,
148 ) -> Result<GraphNode, RepositoryError<E::Error>>
149 where
150 T: Entity,
151 {
152 let mut node = self
153 .graph_node_from_entity(entity)
154 .map_err(RepositoryError::Runtime)?;
155 node.operation = GraphOperation::Create;
156 node.set_comment(comment);
157 self.save_graph(node)
158 }
159
160 pub fn plan_graph(
161 &self,
162 node: GraphNode,
163 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
164 if node.entity != self.entity {
165 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
166 "resolved repository {} cannot plan graph root {}",
167 self.entity, node.entity
168 ))));
169 }
170 let mut node = node;
171 let mut plan = GraphMutationPlan::default();
172 self.collect_graph_plan(&mut node, &mut plan, None, None, false)?;
173 plan.planned_root = Some(node);
174 plan.rebuild_batches();
175 Ok(plan)
176 }
177
178 pub fn execute_graph_plan(
179 &self,
180 plan: GraphMutationPlan,
181 ) -> Result<GraphNode, RepositoryError<E::Error>> {
182 let Some(root) = plan.planned_root else {
183 return Err(RepositoryError::Runtime(RuntimeError::Graph(
184 "graph mutation plan has no planned root".to_owned(),
185 )));
186 };
187
188 for batch in plan.batches {
189 if batch.items.is_empty() {
190 continue;
191 }
192 match batch.kind {
193 GraphMutationKind::Create => {
194 let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
195 for item in batch.items {
196 cmd.batch_values.push(item.values);
197 if let Some(token) = item.scope_token {
198 cmd.trace_chains.push(token.recover_trace_chain());
199 } else {
200 cmd.trace_chains.push(Vec::new());
201 }
202 }
203 self.execute_prepared_batch_insert(cmd)?;
204 }
205 GraphMutationKind::Update => {
206 let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
207 for item in batch.items {
208 let id = item.values.get("id").cloned().ok_or_else(|| {
209 RepositoryError::Runtime(RuntimeError::Graph(format!(
210 "update item in batch missing id for {}", batch.entity
211 )))
212 })?;
213 let version = item.values.get("version").and_then(|v| {
214 if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
215 });
216 cmd.batch_values.push(item.values);
217 cmd.batch_ids.push(id);
218 cmd.batch_expected_versions.push(version);
219 cmd.batch_old_values.push(item.old_values);
220 if let Some(token) = item.scope_token {
221 cmd.trace_chains.push(token.recover_trace_chain());
222 } else {
223 cmd.trace_chains.push(Vec::new());
224 }
225 }
226 self.execute_prepared_batch_update(cmd)?;
227 }
228 GraphMutationKind::Delete => {
229 for item in batch.items {
231 let id = item.values.get("id").cloned().ok_or_else(|| {
232 RepositoryError::Runtime(RuntimeError::Graph(format!(
233 "delete item in batch missing id for {}", batch.entity
234 )))
235 })?;
236 let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
237 if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
238 cmd = cmd.expected_version(*version);
239 }
240 let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
241 self.delete_scoped(&cmd, trace_chain)?;
242 }
243 }
244 GraphMutationKind::Reference => {
245 }
247 }
248 }
249
250 Ok(root)
251 }
252
253 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
254 where
255 T: Entity,
256 {
257 let descriptor = T::entity_descriptor();
258 if descriptor.name != self.entity {
259 return Err(RuntimeError::Graph(format!(
260 "resolved repository {} cannot extract graph root {}",
261 self.entity, descriptor.name
262 )));
263 }
264 let dirty_fields = entity.dirty_fields();
267 let original_values = entity.original_values();
268 let is_deleted = entity.is_marked_as_delete();
269 let comment = entity.comment();
270 let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
271 node.dirty_fields = dirty_fields;
272 node.original_values = original_values;
273 if is_deleted {
274 node.operation = GraphOperation::Remove;
275 node.relations.clear();
276 }
277 if let Some(c) = comment {
278 node.set_comment(c);
279 }
280 Ok(node)
281 }
282
283 fn collect_graph_plan<'s>(
284 &self,
285 node: &mut GraphNode,
286 plan: &mut GraphMutationPlan,
287 parent_scope: Option<&'s ScopedCommentNode<'s>>,
288 parent_token: Option<Arc<TraceScopeToken>>,
289 parent_is_create: bool,
290 ) -> Result<(), RepositoryError<E::Error>> {
291 match node.operation {
292 GraphOperation::Reference => {
293 plan.push(
294 node.entity.clone(),
295 GraphMutationKind::Reference,
296 node.values.clone(),
297 Vec::new(),
298 parent_token,
299 node.original_values.clone(),
300 );
301 return Ok(());
302 }
303 GraphOperation::Remove => {
304 plan.push(
305 node.entity.clone(),
306 GraphMutationKind::Delete,
307 node.values.clone(),
308 Vec::new(),
309 parent_token,
310 node.original_values.clone(),
311 );
312 return Ok(());
313 }
314 GraphOperation::Upsert | GraphOperation::Create => {}
315 }
316
317 let descriptor = self
318 .repository
319 .metadata
320 .context
321 .require_entity(&node.entity)
322 .map_err(RepositoryError::Runtime)?;
323
324 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
326 parent: parent_scope,
327 track: teaql_core::TraceNode {
328 entity_type: node.entity.clone(),
329 entity_id: node.id().and_then(|v| match v {
330 Value::U64(n) => Some(*n),
331 Value::I64(n) => Some(*n as u64),
332 _ => None,
333 }),
334 comment: c.clone(),
335 },
336 });
337 let active_scope = current_scope.as_ref().or(parent_scope);
338
339 let id_property = descriptor.id_property().cloned();
340 let id = id_property.as_ref().and_then(|property| {
341 node.values
342 .get(&property.name)
343 .filter(|value| !is_unassigned_id_value(value))
344 .cloned()
345 });
346
347 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
348
349 let is_update = if is_create_op {
350 false
351 } else {
352 match (id_property.as_ref(), id.as_ref()) {
353 (Some(id_property), Some(id)) => self
354 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?
355 .is_some(),
356 _ => false,
357 }
358 };
359 if !is_update {
360 if let Some(id_property) = id_property.as_ref() {
361 let needs_id = !node.values.contains_key(&id_property.name)
362 || node
363 .values
364 .get(&id_property.name)
365 .is_some_and(is_unassigned_id_value);
366 if needs_id {
367 let id = self
368 .repository
369 .metadata
370 .context
371 .next_id(&node.entity)
372 .map_err(RepositoryError::Runtime)?;
373 node.values.insert(id_property.name.clone(), Value::U64(id));
374 }
375 }
376 ensure_initial_version(&mut node.values, descriptor);
377 }
378 let update_fields = if is_update {
379 let mut excluded = Vec::new();
380 if let Some(id_property) = id_property.as_ref() {
381 excluded.push(id_property.name.clone());
382 }
383 if let Some(version_property) = descriptor.version_property() {
384 excluded.push(version_property.name.clone());
385 }
386 sorted_update_fields(&node.values, excluded)
387 } else {
388 Vec::new()
389 };
390
391 let current_token = if let Some(c) = &node.comment {
394 Some(Arc::new(TraceScopeToken {
395 parent: parent_token.clone(),
396 track: teaql_core::TraceNode {
397 entity_type: node.entity.clone(),
398 entity_id: node.id().and_then(|v| match v {
399 Value::U64(n) => Some(*n),
400 Value::I64(n) => Some(*n as u64),
401 _ => None,
402 }),
403 comment: c.clone(),
404 },
405 node_index: plan.next_item_index,
406 }))
407 } else {
408 parent_token.clone()
409 };
410
411 plan.push(
412 node.entity.clone(),
413 if is_update {
414 GraphMutationKind::Update
415 } else {
416 GraphMutationKind::Create
417 },
418 node.values.clone(),
419 update_fields,
420 current_token.clone(),
421 node.original_values.clone(),
422 );
423
424 for (name, children) in &mut node.relations {
425 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
426 RepositoryError::Runtime(RuntimeError::MissingRelation {
427 entity: node.entity.clone(),
428 relation: name.clone(),
429 })
430 })?;
431 let child_repo = self.scoped_repository(relation.target_entity.clone());
432 for child in children {
433 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
434 child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op)?;
435 }
436 }
437 Ok(())
438 }
439
440 fn insert_graph_node_scoped<'s>(
441 &self,
442 mut node: GraphNode,
443 parent_scope: Option<&'s ScopedCommentNode<'s>>,
444 ) -> Result<GraphNode, RepositoryError<E::Error>> {
445 match node.operation {
446 GraphOperation::Upsert | GraphOperation::Create => {}
447 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()),
448 GraphOperation::Remove => {
449 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
450 "create graph cannot remove node {}",
451 node.entity
452 ))));
453 }
454 }
455
456 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
458 parent: parent_scope,
459 track: teaql_core::TraceNode {
460 entity_type: node.entity.clone(),
461 entity_id: node
462 .id()
463 .and_then(|v| match v {
464 Value::U64(n) => Some(*n),
465 Value::I64(n) => Some(*n as u64),
466 _ => None,
467 }),
468 comment: c.clone(),
469 },
470 });
471 let active_scope = current_scope.as_ref().or(parent_scope);
472
473 let descriptor = self
474 .repository
475 .metadata
476 .context
477 .require_entity(&node.entity)
478 .map_err(RepositoryError::Runtime)?;
479
480 let mut one_relations = Vec::new();
481 let mut many_relations = Vec::new();
482 for (name, children) in std::mem::take(&mut node.relations) {
483 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
484 RepositoryError::Runtime(RuntimeError::MissingRelation {
485 entity: node.entity.clone(),
486 relation: name.clone(),
487 })
488 })?;
489 if relation.many {
490 many_relations.push((name, relation.clone(), children));
491 } else {
492 one_relations.push((name, relation.clone(), children));
493 }
494 }
495
496 for (name, relation, children) in one_relations {
497 if children.len() > 1 {
498 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
499 "relation {}.{} expects one child, got {}",
500 node.entity,
501 name,
502 children.len()
503 ))));
504 }
505 let mut saved_children = Vec::new();
506 for child in children {
507 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
508 let child_repo = self.scoped_repository(child.entity.clone());
509 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
510 if relation.attach {
511 let foreign_value = saved_child
512 .values
513 .get(&relation.foreign_key)
514 .cloned()
515 .ok_or_else(|| {
516 RepositoryError::Runtime(RuntimeError::Graph(format!(
517 "saved child {} missing foreign key {} for relation {}.{}",
518 relation.target_entity, relation.foreign_key, node.entity, name
519 )))
520 })?;
521 node.values
522 .insert(relation.local_key.clone(), foreign_value);
523 }
524 saved_children.push(saved_child);
525 }
526 node.relations.insert(name, saved_children);
527 }
528
529 let command = self
530 .prepare_insert_command(&InsertCommand {
531 entity: node.entity.clone(),
532 values: node.values.clone(),
533 trace_chain: Vec::new(),
534 })
535 .map_err(RepositoryError::Runtime)?;
536 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
537 self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
538 node.values = command.values;
539
540 for (name, relation, children) in many_relations {
541 let local_value = node
542 .values
543 .get(&relation.local_key)
544 .cloned()
545 .ok_or_else(|| {
546 RepositoryError::Runtime(RuntimeError::Graph(format!(
547 "parent {} missing local key {} for relation {}",
548 node.entity, relation.local_key, name
549 )))
550 })?;
551 let mut saved_children = Vec::new();
552 for mut child in children {
553 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
554 if relation.attach {
555 child
556 .values
557 .insert(relation.foreign_key.clone(), local_value.clone());
558 }
559 let child_repo = self.scoped_repository(child.entity.clone());
560 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
561 }
562 node.relations.insert(name, saved_children);
563 }
564
565 Ok(node)
566 }
567
568 fn upsert_graph_node_scoped<'s>(
569 &self,
570 mut node: GraphNode,
571 parent_scope: Option<&'s ScopedCommentNode<'s>>,
572 ) -> Result<GraphNode, RepositoryError<E::Error>> {
573 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
575 parent: parent_scope,
576 track: teaql_core::TraceNode {
577 entity_type: node.entity.clone(),
578 entity_id: node
579 .id()
580 .and_then(|v| match v {
581 Value::U64(n) => Some(*n),
582 Value::I64(n) => Some(*n as u64),
583 _ => None,
584 }),
585 comment: c.clone(),
586 },
587 });
588 let active_scope = current_scope.as_ref().or(parent_scope);
589
590 match node.operation {
591 GraphOperation::Upsert | GraphOperation::Create => {}
592 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()),
593 GraphOperation::Remove => {
594 self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?;
595 self.delete_graph_node(&node, parent_scope)?;
596 return Ok(node);
597 }
598 }
599
600 let descriptor = self
601 .repository
602 .metadata
603 .context
604 .require_entity(&node.entity)
605 .map_err(RepositoryError::Runtime)?;
606 let Some(id_property) = descriptor.id_property() else {
607 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
608 "entity {} has no id property for graph upsert",
609 node.entity
610 ))));
611 };
612 let Some(id) = node
613 .values
614 .get(&id_property.name)
615 .filter(|value| !is_unassigned_id_value(value))
616 .cloned()
617 else {
618 node.comment = None;
620 return self.insert_graph_node_scoped(node, active_scope);
621 };
622
623 if node.operation == GraphOperation::Create || self
624 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default())?
625 .is_none()
626 {
627 node.comment = None;
628 return self.insert_graph_node_scoped(node, active_scope);
629 }
630
631 let mut one_relations = Vec::new();
632 let mut many_relations = Vec::new();
633 for (name, children) in std::mem::take(&mut node.relations) {
634 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
635 RepositoryError::Runtime(RuntimeError::MissingRelation {
636 entity: node.entity.clone(),
637 relation: name.clone(),
638 })
639 })?;
640 if relation.many {
641 many_relations.push((name, relation.clone(), children));
642 } else {
643 one_relations.push((name, relation.clone(), children));
644 }
645 }
646
647 for (name, relation, children) in one_relations {
648 if children.len() > 1 {
649 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
650 "relation {}.{} expects one child, got {}",
651 node.entity,
652 name,
653 children.len()
654 ))));
655 }
656 let mut saved_children = Vec::new();
657 for child in children {
658 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
659 let child_repo = self.scoped_repository(child.entity.clone());
660 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
661 if relation.attach {
662 let foreign_value = saved_child
663 .values
664 .get(&relation.foreign_key)
665 .cloned()
666 .ok_or_else(|| {
667 RepositoryError::Runtime(RuntimeError::Graph(format!(
668 "saved child {} missing foreign key {} for relation {}.{}",
669 relation.target_entity, relation.foreign_key, node.entity, name
670 )))
671 })?;
672 node.values
673 .insert(relation.local_key.clone(), foreign_value);
674 }
675 saved_children.push(saved_child);
676 }
677 node.relations.insert(name, saved_children);
678 }
679
680 let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
681 if !update.values.is_empty() || update.expected_version.is_some() {
682 let prepared_update = self
683 .prepare_update_command(&update)
684 .map_err(RepositoryError::Runtime)?;
685 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
686 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
687 for (field, value) in &prepared_update.values {
688 node.values.insert(field.clone(), value.clone());
689 }
690 if let Some(version_property) = descriptor.version_property() {
691 if let Some(expected_version) = prepared_update.expected_version {
692 node.values.insert(
693 version_property.name.clone(),
694 Value::I64(expected_version + 1),
695 );
696 }
697 }
698 }
699
700 for (name, relation, children) in many_relations {
701 let local_value = node
702 .values
703 .get(&relation.local_key)
704 .cloned()
705 .ok_or_else(|| {
706 RepositoryError::Runtime(RuntimeError::Graph(format!(
707 "parent {} missing local key {} for relation {}",
708 node.entity, relation.local_key, name
709 )))
710 })?;
711 let child_repo = self.scoped_repository(relation.target_entity.clone());
712 let child_descriptor = self
713 .repository
714 .metadata
715 .context
716 .require_entity(&relation.target_entity)
717 .map_err(RepositoryError::Runtime)?;
718 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
719 RepositoryError::Runtime(RuntimeError::Graph(format!(
720 "entity {} has no id property",
721 relation.target_entity
722 )))
723 })?;
724
725 let mut seen = std::collections::BTreeSet::new();
726 let mut saved_children = Vec::new();
727 for mut child in children {
728 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
729 if relation.attach && child.operation != GraphOperation::Reference {
730 child
731 .values
732 .insert(relation.foreign_key.clone(), local_value.clone());
733 }
734 if let Some(child_id) = child
735 .values
736 .get(&child_id_property.name)
737 .filter(|value| !is_unassigned_id_value(value))
738 {
739 let key = graph_identity_key(child_id);
740 if !seen.insert(key.clone()) {
741 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
742 "duplicate child id {key} in relation {}.{}",
743 node.entity, name
744 ))));
745 }
746 }
747 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
748 }
749
750
751
752 node.relations.insert(name, saved_children);
753 }
754
755 Ok(node)
756 }
757
758 fn validate_reference_node(
759 &self,
760 node: GraphNode,
761 trace_chain: Vec<teaql_core::TraceNode>,
762 ) -> Result<GraphNode, RepositoryError<E::Error>> {
763 if !node.relations.is_empty() {
764 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
765 "reference node {} cannot contain child relations",
766 node.entity
767 ))));
768 }
769 let descriptor = self
770 .repository
771 .metadata
772 .context
773 .require_entity(&node.entity)
774 .map_err(RepositoryError::Runtime)?;
775 let id_property = descriptor.id_property().ok_or_else(|| {
776 RepositoryError::Runtime(RuntimeError::Graph(format!(
777 "entity {} has no id property for graph reference",
778 node.entity
779 )))
780 })?;
781 let id = node
782 .values
783 .get(&id_property.name)
784 .filter(|value| !is_unassigned_id_value(value))
785 .cloned()
786 .ok_or_else(|| {
787 RepositoryError::Runtime(RuntimeError::Graph(format!(
788 "reference node {} missing id property {}",
789 node.entity, id_property.name
790 )))
791 })?;
792
793 for field in node.values.keys() {
794 if field == &id_property.name {
795 continue;
796 }
797 if descriptor
798 .version_property()
799 .map(|property| field == &property.name)
800 .unwrap_or(false)
801 {
802 continue;
803 }
804 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
805 "reference node {} cannot carry mutable field {}",
806 node.entity, field
807 ))));
808 }
809
810 let current = self
811 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain)?
812 .ok_or_else(|| {
813 RepositoryError::Runtime(RuntimeError::Graph(format!(
814 "reference node {}({}) does not exist",
815 node.entity,
816 graph_identity_key(&id)
817 )))
818 })?;
819
820 if let Some(version_property) = descriptor.version_property() {
821 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
822 if *existing_version < 0 {
823 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
824 "reference node {}({}) is deleted",
825 node.entity,
826 graph_identity_key(&id)
827 ))));
828 }
829 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
830 {
831 if expected_version != existing_version {
832 return Err(RepositoryError::Runtime(
833 RuntimeError::OptimisticLockConflict {
834 entity: node.entity,
835 id: graph_identity_key(&id),
836 },
837 ));
838 }
839 }
840 }
841 }
842
843 Ok(GraphNode {
844 entity: node.entity,
845 values: current,
846 relations: BTreeMap::new(),
847 operation: GraphOperation::Reference,
848 comment: None,
849 dirty_fields: None, original_values: None,
850 })
851 }
852
853 fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
854 if !node.relations.is_empty() {
855 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
856 "remove node {} cannot contain child relations",
857 node.entity
858 ))));
859 }
860 let descriptor = self
861 .repository
862 .metadata
863 .context
864 .require_entity(&node.entity)
865 .map_err(RepositoryError::Runtime)?;
866 let id_property = descriptor.id_property().ok_or_else(|| {
867 RepositoryError::Runtime(RuntimeError::Graph(format!(
868 "entity {} has no id property for graph remove",
869 node.entity
870 )))
871 })?;
872 let id = node
873 .values
874 .get(&id_property.name)
875 .filter(|value| !is_unassigned_id_value(value))
876 .cloned()
877 .ok_or_else(|| {
878 RepositoryError::Runtime(RuntimeError::Graph(format!(
879 "remove node {} missing id property {}",
880 node.entity, id_property.name
881 )))
882 })?;
883 let current = self
884 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain)?
885 .ok_or_else(|| {
886 RepositoryError::Runtime(RuntimeError::Graph(format!(
887 "remove node {}({}) does not exist",
888 node.entity,
889 graph_identity_key(&id)
890 )))
891 })?;
892 if let Some(version_property) = descriptor.version_property() {
893 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
894 if *existing_version < 0 {
895 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
896 "remove node {}({}) is already deleted",
897 node.entity,
898 graph_identity_key(&id)
899 ))));
900 }
901 }
902 }
903 Ok(())
904 }
905
906 fn graph_node_from_record(
907 &self,
908 entity: &str,
909 record: Record,
910 ) -> Result<GraphNode, RuntimeError> {
911 let descriptor = self.repository.metadata.context.require_entity(entity)?;
912 let mut node = GraphNode::new(entity);
913
914 for (field, value) in record {
915 if field == "_comment" {
916 if let Value::Text(comment) = value {
917 node.set_comment(comment);
918 }
919 continue;
920 }
921 let Some(relation) = descriptor.relation_by_name(&field) else {
922 node.values.insert(field, value);
923 continue;
924 };
925
926 match value {
927 Value::Null => {
928 node.relations.entry(field).or_default();
929 }
930 Value::Object(record) => {
931 let child = self.graph_node_from_record(&relation.target_entity, record)?;
932 node.relations.entry(field).or_default().push(child);
933 }
934 Value::List(values) => {
935 let children = node.relations.entry(field.clone()).or_default();
936 for value in values {
937 let Value::Object(record) = value else {
938 return Err(RuntimeError::Graph(format!(
939 "relation {}.{} expects object children, got {:?}",
940 entity, field, value
941 )));
942 };
943 children
944 .push(self.graph_node_from_record(&relation.target_entity, record)?);
945 }
946 }
947 other => {
948 return Err(RuntimeError::Graph(format!(
949 "relation {}.{} expects object/list/null, got {:?}",
950 entity, field, other
951 )));
952 }
953 }
954 }
955
956 Ok(node)
957 }
958
959 fn graph_update_command(
960 &self,
961 node: &mut GraphNode,
962 descriptor: &EntityDescriptor,
963 id_property: &PropertyDescriptor,
964 id: &Value,
965 ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
966 crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
967 let check_result = self
968 .repository
969 .metadata
970 .context
971 .check_and_fix_record(&node.entity, &mut node.values);
972 crate::clear_record_status(&mut node.values);
973 check_result.map_err(RepositoryError::Runtime)?;
974
975 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
976 command.old_values = node.original_values.clone();
977 if let Some(version_property) = descriptor.version_property() {
978 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
979 command = command.expected_version(*version);
980 }
981 }
982 for property in descriptor.properties.iter().filter(|property| {
986 !property.is_id
987 && !property.is_version
988 && property.name != id_property.name
989 && match &node.dirty_fields {
990 Some(dirty) => dirty.contains(&property.name),
991 None => node.values.contains_key(&property.name),
992 }
993 }) {
994 if let Some(value) = node.values.get(&property.name) {
995 command.values.insert(property.name.clone(), value.clone());
996 }
997 }
998 Ok(command)
999 }
1000
1001 fn delete_graph_node<'s>(
1002 &self,
1003 node: &GraphNode,
1004 parent_scope: Option<&'s ScopedCommentNode<'s>>,
1005 ) -> Result<u64, RepositoryError<E::Error>> {
1006 let descriptor = self
1007 .repository
1008 .metadata
1009 .context
1010 .require_entity(&node.entity)
1011 .map_err(RepositoryError::Runtime)?;
1012 let id_property = descriptor.id_property().ok_or_else(|| {
1013 RepositoryError::Runtime(RuntimeError::Graph(format!(
1014 "entity {} has no id property for graph remove",
1015 node.entity
1016 )))
1017 })?;
1018 let id = node
1019 .values
1020 .get(&id_property.name)
1021 .filter(|value| !is_unassigned_id_value(value))
1022 .cloned()
1023 .ok_or_else(|| {
1024 RepositoryError::Runtime(RuntimeError::Graph(format!(
1025 "remove node {} missing id property {}",
1026 node.entity, id_property.name
1027 )))
1028 })?;
1029 let mut delete = DeleteCommand::new(node.entity.clone(), id);
1030 if let Some(version_property) = descriptor.version_property() {
1031 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1032 delete = delete.expected_version(*version);
1033 }
1034 }
1035
1036 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1038 parent: parent_scope,
1039 track: teaql_core::TraceNode {
1040 entity_type: node.entity.clone(),
1041 entity_id: node
1042 .id()
1043 .and_then(|v| match v {
1044 Value::U64(n) => Some(*n),
1045 Value::I64(n) => Some(*n as u64),
1046 _ => None,
1047 }),
1048 comment: c.clone(),
1049 },
1050 });
1051 let active_scope = current_scope.as_ref().or(parent_scope);
1052 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1053
1054 self.delete_scoped(&delete, lineage)
1055 }
1056
1057 pub fn fetch_graph_current_row(
1058 &self,
1059 entity: &str,
1060 id_property: &str,
1061 id: &Value,
1062 trace_chain: Vec<teaql_core::TraceNode>,
1063 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1064 let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1065 query.trace_chain = trace_chain;
1066 let mut rows = self
1067 .scoped_repository(entity.to_owned())
1068 .fetch_all(&query)?;
1069 Ok(rows.pop())
1070 }
1071
1072 fn fetch_graph_children(
1073 &self,
1074 entity: &str,
1075 foreign_key: &str,
1076 parent_value: &Value,
1077 trace_chain: Vec<teaql_core::TraceNode>,
1078 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1079 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1080 query.trace_chain = trace_chain;
1081 self.scoped_repository(entity.to_owned()).fetch_all(&query)
1082 }
1083}