1use std::collections::BTreeMap;
2
3use teaql_core::{
4 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
5 SelectQuery, UpdateCommand, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{
10 CommentTrack, GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11 RuntimeError, ScopedCommentNode, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
16
17impl<'a, D, E> ResolvedRepository<'a, D, E>
18where
19 D: SqlDialect,
20 E: QueryExecutor,
21{
22 pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
23 if node.entity != self.entity {
24 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
25 "resolved repository {} cannot save graph root {}",
26 self.entity, node.entity
27 ))));
28 }
29 let boundary = self
30 .repository
31 .executor
32 .begin_transaction()
33 .map_err(RepositoryError::Executor)?;
34 if matches!(boundary, GraphTransactionBoundary::Unsupported) {
35 return Err(RepositoryError::Runtime(RuntimeError::Graph(
36 "save_graph requires a transactional executor".to_owned(),
37 )));
38 }
39 let result = self.upsert_graph_node_scoped(node, None);
40 match result {
41 Ok(saved) => {
42 if matches!(boundary, GraphTransactionBoundary::Started) {
43 self.repository
44 .executor
45 .commit_transaction()
46 .map_err(RepositoryError::Executor)?;
47 }
48 Ok(saved)
49 }
50 Err(err) => {
51 if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
52 self.repository
53 .executor
54 .rollback_transaction()
55 .map_err(RepositoryError::Executor)?;
56 }
57 Err(err)
58 }
59 }
60 }
61
62
63 pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
64 where
65 T: Entity,
66 {
67 let node = self
68 .graph_node_from_entity(entity)
69 .map_err(RepositoryError::Runtime)?;
70 self.save_graph(node)
71 }
72
73 pub fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
74 where
75 T: Entity,
76 {
77 if !status.need_persist() {
78 return Ok(GraphNode::new(&self.entity));
79 }
80 if status.is_deleted() {
81 let mut node = self.graph_node_from_entity(entity)
82 .map_err(RepositoryError::Runtime)?;
83 node.operation = GraphOperation::Remove;
84 node.relations.clear();
85 self.save_graph(node)
86 } else {
87 self.save_entity_graph(entity)
88 }
89 }
90 pub fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
91 where
92 T: Entity,
93 {
94 if status.is_deleted() {
95 let mut node = self.graph_node_from_entity(entity)
96 .map_err(RepositoryError::Runtime)?;
97 node.operation = GraphOperation::Remove;
98 node.relations.clear();
99 node.set_comment(comment);
100 self.save_graph(node)
101 } else {
102 self.save_entity_graph_with_comment(entity, comment)
103 }
104 }
105 pub fn save_entity_graph_with_comment<T>(
106 &self,
107 entity: T,
108 comment: impl Into<String>,
109 ) -> Result<GraphNode, RepositoryError<E::Error>>
110 where
111 T: Entity,
112 {
113 let mut node = self
114 .graph_node_from_entity(entity)
115 .map_err(RepositoryError::Runtime)?;
116 node.set_comment(comment);
117 self.save_graph(node)
118 }
119
120 pub fn create_entity_graph_with_comment<T>(
124 &self,
125 entity: T,
126 comment: impl Into<String>,
127 ) -> Result<GraphNode, RepositoryError<E::Error>>
128 where
129 T: Entity,
130 {
131 let mut node = self
132 .graph_node_from_entity(entity)
133 .map_err(RepositoryError::Runtime)?;
134 node.operation = GraphOperation::Create;
135 node.set_comment(comment);
136 self.save_graph(node)
137 }
138
139 pub fn plan_graph(
140 &self,
141 node: GraphNode,
142 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
143 if node.entity != self.entity {
144 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
145 "resolved repository {} cannot plan graph root {}",
146 self.entity, node.entity
147 ))));
148 }
149 let mut node = node;
150 let mut plan = GraphMutationPlan::default();
151 self.collect_graph_plan(&mut node, &mut plan, None, false)?;
152 plan.planned_root = Some(node);
153 plan.rebuild_batches();
154 Ok(plan)
155 }
156
157 pub fn execute_graph_plan(
158 &self,
159 plan: GraphMutationPlan,
160 ) -> Result<GraphNode, RepositoryError<E::Error>> {
161 let Some(root) = plan.planned_root else {
162 return Err(RepositoryError::Runtime(RuntimeError::Graph(
163 "graph mutation plan has no planned root".to_owned(),
164 )));
165 };
166
167 self.upsert_graph_node_scoped(root, None)
168 }
169
170 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
171 where
172 T: Entity,
173 {
174 let descriptor = T::entity_descriptor();
175 if descriptor.name != self.entity {
176 return Err(RuntimeError::Graph(format!(
177 "resolved repository {} cannot extract graph root {}",
178 self.entity, descriptor.name
179 )));
180 }
181 self.graph_node_from_record(&descriptor.name, entity.into_record())
182 }
183
184 fn collect_graph_plan<'s>(
185 &self,
186 node: &mut GraphNode,
187 plan: &mut GraphMutationPlan,
188 parent_scope: Option<&'s ScopedCommentNode<'s>>,
189 parent_is_create: bool,
190 ) -> Result<(), RepositoryError<E::Error>> {
191 match node.operation {
192 GraphOperation::Reference => {
193 plan.push(
194 node.entity.clone(),
195 GraphMutationKind::Reference,
196 node.values.clone(),
197 Vec::new(),
198 );
199 return Ok(());
200 }
201 GraphOperation::Remove => {
202 plan.push(
203 node.entity.clone(),
204 GraphMutationKind::Delete,
205 node.values.clone(),
206 Vec::new(),
207 );
208 return Ok(());
209 }
210 GraphOperation::Upsert | GraphOperation::Create => {}
211 }
212
213 let descriptor = self
214 .repository
215 .metadata
216 .context
217 .require_entity(&node.entity)
218 .map_err(RepositoryError::Runtime)?;
219
220 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
222 parent: parent_scope,
223 track: CommentTrack {
224 entity_type: node.entity.clone(),
225 entity_id: node
226 .id()
227 .map(|v| match v {
228 Value::U64(n) => n.to_string(),
229 Value::I64(n) => n.to_string(),
230 Value::Text(s) => s.clone(),
231 other => format!("{other:?}"),
232 })
233 .unwrap_or_else(|| "(pending)".into()),
234 comment: c.clone(),
235 },
236 });
237 let active_scope = current_scope.as_ref().or(parent_scope);
238
239 let id_property = descriptor.id_property().cloned();
240 let id = id_property.as_ref().and_then(|property| {
241 node.values
242 .get(&property.name)
243 .filter(|value| !is_unassigned_id_value(value))
244 .cloned()
245 });
246
247 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
248
249 let is_update = if is_create_op {
250 false
251 } else {
252 match (id_property.as_ref(), id.as_ref()) {
253 (Some(id_property), Some(id)) => self
254 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_lineage_string()))?
255 .is_some(),
256 _ => false,
257 }
258 };
259 if !is_update {
260 if let Some(id_property) = id_property.as_ref() {
261 let needs_id = !node.values.contains_key(&id_property.name)
262 || node
263 .values
264 .get(&id_property.name)
265 .is_some_and(is_unassigned_id_value);
266 if needs_id {
267 let id = self
268 .repository
269 .metadata
270 .context
271 .next_id(&node.entity)
272 .map_err(RepositoryError::Runtime)?;
273 node.values.insert(id_property.name.clone(), Value::U64(id));
274 }
275 }
276 ensure_initial_version(&mut node.values, descriptor);
277 }
278 let update_fields = if is_update {
279 let mut excluded = Vec::new();
280 if let Some(id_property) = id_property.as_ref() {
281 excluded.push(id_property.name.clone());
282 }
283 if let Some(version_property) = descriptor.version_property() {
284 excluded.push(version_property.name.clone());
285 }
286 sorted_update_fields(&node.values, excluded)
287 } else {
288 Vec::new()
289 };
290 plan.push(
291 node.entity.clone(),
292 if is_update {
293 GraphMutationKind::Update
294 } else {
295 GraphMutationKind::Create
296 },
297 node.values.clone(),
298 update_fields,
299 );
300
301 for (name, children) in &mut node.relations {
302 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
303 RepositoryError::Runtime(RuntimeError::MissingRelation {
304 entity: node.entity.clone(),
305 relation: name.clone(),
306 })
307 })?;
308 let child_repo = self.scoped_repository(relation.target_entity.clone());
309 for child in children {
310 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
311 child_repo.collect_graph_plan(child, plan, active_scope, is_create_op)?;
312 }
313 }
314 Ok(())
315 }
316
317 fn insert_graph_node_scoped<'s>(
318 &self,
319 mut node: GraphNode,
320 parent_scope: Option<&'s ScopedCommentNode<'s>>,
321 ) -> Result<GraphNode, RepositoryError<E::Error>> {
322 match node.operation {
323 GraphOperation::Upsert | GraphOperation::Create => {}
324 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_lineage_string())),
325 GraphOperation::Remove => {
326 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
327 "create graph cannot remove node {}",
328 node.entity
329 ))));
330 }
331 }
332
333 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
335 parent: parent_scope,
336 track: CommentTrack {
337 entity_type: node.entity.clone(),
338 entity_id: node
339 .id()
340 .map(|v| match v {
341 Value::U64(n) => n.to_string(),
342 Value::I64(n) => n.to_string(),
343 Value::Text(s) => s.clone(),
344 other => format!("{other:?}"),
345 })
346 .unwrap_or_else(|| "(pending)".into()),
347 comment: c.clone(),
348 },
349 });
350 let active_scope = current_scope.as_ref().or(parent_scope);
351
352 let descriptor = self
353 .repository
354 .metadata
355 .context
356 .require_entity(&node.entity)
357 .map_err(RepositoryError::Runtime)?;
358
359 let mut one_relations = Vec::new();
360 let mut many_relations = Vec::new();
361 for (name, children) in std::mem::take(&mut node.relations) {
362 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
363 RepositoryError::Runtime(RuntimeError::MissingRelation {
364 entity: node.entity.clone(),
365 relation: name.clone(),
366 })
367 })?;
368 if relation.many {
369 many_relations.push((name, relation.clone(), children));
370 } else {
371 one_relations.push((name, relation.clone(), children));
372 }
373 }
374
375 for (name, relation, children) in one_relations {
376 if children.len() > 1 {
377 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
378 "relation {}.{} expects one child, got {}",
379 node.entity,
380 name,
381 children.len()
382 ))));
383 }
384 let mut saved_children = Vec::new();
385 for child in children {
386 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
387 let child_repo = self.scoped_repository(child.entity.clone());
388 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
389 if relation.attach {
390 let foreign_value = saved_child
391 .values
392 .get(&relation.foreign_key)
393 .cloned()
394 .ok_or_else(|| {
395 RepositoryError::Runtime(RuntimeError::Graph(format!(
396 "saved child {} missing foreign key {} for relation {}.{}",
397 relation.target_entity, relation.foreign_key, node.entity, name
398 )))
399 })?;
400 node.values
401 .insert(relation.local_key.clone(), foreign_value);
402 }
403 saved_children.push(saved_child);
404 }
405 node.relations.insert(name, saved_children);
406 }
407
408 let command = self
409 .prepare_insert_command(&InsertCommand {
410 entity: node.entity.clone(),
411 values: node.values.clone(),
412 })
413 .map_err(RepositoryError::Runtime)?;
414 let lineage = active_scope.map(|s| s.to_lineage_string());
415 self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
416 node.values = command.values;
417
418 for (name, relation, children) in many_relations {
419 let local_value = node
420 .values
421 .get(&relation.local_key)
422 .cloned()
423 .ok_or_else(|| {
424 RepositoryError::Runtime(RuntimeError::Graph(format!(
425 "parent {} missing local key {} for relation {}",
426 node.entity, relation.local_key, name
427 )))
428 })?;
429 let mut saved_children = Vec::new();
430 for mut child in children {
431 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
432 if relation.attach {
433 child
434 .values
435 .insert(relation.foreign_key.clone(), local_value.clone());
436 }
437 let child_repo = self.scoped_repository(child.entity.clone());
438 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
439 }
440 node.relations.insert(name, saved_children);
441 }
442
443 Ok(node)
444 }
445
446 fn upsert_graph_node_scoped<'s>(
447 &self,
448 mut node: GraphNode,
449 parent_scope: Option<&'s ScopedCommentNode<'s>>,
450 ) -> Result<GraphNode, RepositoryError<E::Error>> {
451 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
453 parent: parent_scope,
454 track: CommentTrack {
455 entity_type: node.entity.clone(),
456 entity_id: node
457 .id()
458 .map(|v| match v {
459 Value::U64(n) => n.to_string(),
460 Value::I64(n) => n.to_string(),
461 Value::Text(s) => s.clone(),
462 other => format!("{other:?}"),
463 })
464 .unwrap_or_else(|| "(pending)".into()),
465 comment: c.clone(),
466 },
467 });
468 let active_scope = current_scope.as_ref().or(parent_scope);
469
470 match node.operation {
471 GraphOperation::Upsert | GraphOperation::Create => {}
472 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_lineage_string())),
473 GraphOperation::Remove => {
474 self.validate_remove_node(&node, active_scope.map(|s| s.to_lineage_string()))?;
475 self.delete_graph_node(&node, parent_scope)?;
476 return Ok(node);
477 }
478 }
479
480 let descriptor = self
481 .repository
482 .metadata
483 .context
484 .require_entity(&node.entity)
485 .map_err(RepositoryError::Runtime)?;
486 let Some(id_property) = descriptor.id_property() else {
487 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
488 "entity {} has no id property for graph upsert",
489 node.entity
490 ))));
491 };
492 let Some(id) = node
493 .values
494 .get(&id_property.name)
495 .filter(|value| !is_unassigned_id_value(value))
496 .cloned()
497 else {
498 node.comment = None;
500 return self.insert_graph_node_scoped(node, active_scope);
501 };
502
503 if node.operation == GraphOperation::Create || self
504 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_lineage_string()))?
505 .is_none()
506 {
507 node.comment = None;
508 return self.insert_graph_node_scoped(node, active_scope);
509 }
510
511 let mut one_relations = Vec::new();
512 let mut many_relations = Vec::new();
513 for (name, children) in std::mem::take(&mut node.relations) {
514 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
515 RepositoryError::Runtime(RuntimeError::MissingRelation {
516 entity: node.entity.clone(),
517 relation: name.clone(),
518 })
519 })?;
520 if relation.many {
521 many_relations.push((name, relation.clone(), children));
522 } else {
523 one_relations.push((name, relation.clone(), children));
524 }
525 }
526
527 for (name, relation, children) in one_relations {
528 if children.len() > 1 {
529 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
530 "relation {}.{} expects one child, got {}",
531 node.entity,
532 name,
533 children.len()
534 ))));
535 }
536 let mut saved_children = Vec::new();
537 for child in children {
538 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
539 let child_repo = self.scoped_repository(child.entity.clone());
540 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
541 if relation.attach {
542 let foreign_value = saved_child
543 .values
544 .get(&relation.foreign_key)
545 .cloned()
546 .ok_or_else(|| {
547 RepositoryError::Runtime(RuntimeError::Graph(format!(
548 "saved child {} missing foreign key {} for relation {}.{}",
549 relation.target_entity, relation.foreign_key, node.entity, name
550 )))
551 })?;
552 node.values
553 .insert(relation.local_key.clone(), foreign_value);
554 }
555 saved_children.push(saved_child);
556 }
557 node.relations.insert(name, saved_children);
558 }
559
560 let update = self.graph_update_command(&node, descriptor, id_property, &id);
561 if !update.values.is_empty() || update.expected_version.is_some() {
562 let prepared_update = self
563 .prepare_update_command(&update)
564 .map_err(RepositoryError::Runtime)?;
565 let lineage = active_scope.map(|s| s.to_lineage_string());
566 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
567 for (field, value) in &prepared_update.values {
568 node.values.insert(field.clone(), value.clone());
569 }
570 if let Some(version_property) = descriptor.version_property() {
571 if let Some(expected_version) = prepared_update.expected_version {
572 node.values.insert(
573 version_property.name.clone(),
574 Value::I64(expected_version + 1),
575 );
576 }
577 }
578 }
579
580 for (name, relation, children) in many_relations {
581 let local_value = node
582 .values
583 .get(&relation.local_key)
584 .cloned()
585 .ok_or_else(|| {
586 RepositoryError::Runtime(RuntimeError::Graph(format!(
587 "parent {} missing local key {} for relation {}",
588 node.entity, relation.local_key, name
589 )))
590 })?;
591 let child_repo = self.scoped_repository(relation.target_entity.clone());
592 let existing_children = child_repo.fetch_graph_children(
593 &relation.target_entity,
594 &relation.foreign_key,
595 &local_value,
596 active_scope.map(|s| s.to_lineage_string()),
597 )?;
598 let child_descriptor = self
599 .repository
600 .metadata
601 .context
602 .require_entity(&relation.target_entity)
603 .map_err(RepositoryError::Runtime)?;
604 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
605 RepositoryError::Runtime(RuntimeError::Graph(format!(
606 "entity {} has no id property for graph diff",
607 relation.target_entity
608 )))
609 })?;
610 let mut existing_by_id = BTreeMap::new();
611 for child in existing_children {
612 if let Some(id) = child.get(&child_id_property.name) {
613 existing_by_id.insert(graph_identity_key(id), child);
614 }
615 }
616
617 let mut seen = std::collections::BTreeSet::new();
618 let mut saved_children = Vec::new();
619 for mut child in children {
620 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
621 if relation.attach && child.operation != GraphOperation::Reference {
622 child
623 .values
624 .insert(relation.foreign_key.clone(), local_value.clone());
625 }
626 if let Some(child_id) = child
627 .values
628 .get(&child_id_property.name)
629 .filter(|value| !is_unassigned_id_value(value))
630 {
631 let key = graph_identity_key(child_id);
632 if !seen.insert(key.clone()) {
633 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
634 "duplicate child id {key} in relation {}.{}",
635 node.entity, name
636 ))));
637 }
638 }
639 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
640 }
641
642 if relation.delete_missing {
643 for (id_key, existing) in existing_by_id {
644 if seen.contains(&id_key) {
645 continue;
646 }
647 let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
648 continue;
649 };
650 let mut delete =
651 DeleteCommand::new(relation.target_entity.clone(), existing_id);
652 if let Some(version) = graph_record_version(&existing, child_descriptor) {
653 delete = delete.expected_version(version);
654 }
655 let lineage = active_scope.map(|s| s.to_lineage_string());
656 child_repo.delete_scoped(&delete, lineage)?;
657 }
658 }
659
660 node.relations.insert(name, saved_children);
661 }
662
663 Ok(node)
664 }
665
666 fn validate_reference_node(
667 &self,
668 node: GraphNode,
669 lineage: Option<String>,
670 ) -> Result<GraphNode, RepositoryError<E::Error>> {
671 if !node.relations.is_empty() {
672 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
673 "reference node {} cannot contain child relations",
674 node.entity
675 ))));
676 }
677 let descriptor = self
678 .repository
679 .metadata
680 .context
681 .require_entity(&node.entity)
682 .map_err(RepositoryError::Runtime)?;
683 let id_property = descriptor.id_property().ok_or_else(|| {
684 RepositoryError::Runtime(RuntimeError::Graph(format!(
685 "entity {} has no id property for graph reference",
686 node.entity
687 )))
688 })?;
689 let id = node
690 .values
691 .get(&id_property.name)
692 .filter(|value| !is_unassigned_id_value(value))
693 .cloned()
694 .ok_or_else(|| {
695 RepositoryError::Runtime(RuntimeError::Graph(format!(
696 "reference node {} missing id property {}",
697 node.entity, id_property.name
698 )))
699 })?;
700
701 for field in node.values.keys() {
702 if field == &id_property.name {
703 continue;
704 }
705 if descriptor
706 .version_property()
707 .map(|property| field == &property.name)
708 .unwrap_or(false)
709 {
710 continue;
711 }
712 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
713 "reference node {} cannot carry mutable field {}",
714 node.entity, field
715 ))));
716 }
717
718 let current = self
719 .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
720 .ok_or_else(|| {
721 RepositoryError::Runtime(RuntimeError::Graph(format!(
722 "reference node {}({}) does not exist",
723 node.entity,
724 graph_identity_key(&id)
725 )))
726 })?;
727
728 if let Some(version_property) = descriptor.version_property() {
729 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
730 if *existing_version < 0 {
731 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
732 "reference node {}({}) is deleted",
733 node.entity,
734 graph_identity_key(&id)
735 ))));
736 }
737 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
738 {
739 if expected_version != existing_version {
740 return Err(RepositoryError::Runtime(
741 RuntimeError::OptimisticLockConflict {
742 entity: node.entity,
743 id: graph_identity_key(&id),
744 },
745 ));
746 }
747 }
748 }
749 }
750
751 Ok(GraphNode {
752 entity: node.entity,
753 values: current,
754 relations: BTreeMap::new(),
755 operation: GraphOperation::Reference,
756 comment: None,
757 })
758 }
759
760 fn validate_remove_node(&self, node: &GraphNode, lineage: Option<String>) -> Result<(), RepositoryError<E::Error>> {
761 if !node.relations.is_empty() {
762 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
763 "remove node {} cannot contain child relations",
764 node.entity
765 ))));
766 }
767 let descriptor = self
768 .repository
769 .metadata
770 .context
771 .require_entity(&node.entity)
772 .map_err(RepositoryError::Runtime)?;
773 let id_property = descriptor.id_property().ok_or_else(|| {
774 RepositoryError::Runtime(RuntimeError::Graph(format!(
775 "entity {} has no id property for graph remove",
776 node.entity
777 )))
778 })?;
779 let id = node
780 .values
781 .get(&id_property.name)
782 .filter(|value| !is_unassigned_id_value(value))
783 .cloned()
784 .ok_or_else(|| {
785 RepositoryError::Runtime(RuntimeError::Graph(format!(
786 "remove node {} missing id property {}",
787 node.entity, id_property.name
788 )))
789 })?;
790 let current = self
791 .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
792 .ok_or_else(|| {
793 RepositoryError::Runtime(RuntimeError::Graph(format!(
794 "remove node {}({}) does not exist",
795 node.entity,
796 graph_identity_key(&id)
797 )))
798 })?;
799 if let Some(version_property) = descriptor.version_property() {
800 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
801 if *existing_version < 0 {
802 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
803 "remove node {}({}) is already deleted",
804 node.entity,
805 graph_identity_key(&id)
806 ))));
807 }
808 }
809 }
810 Ok(())
811 }
812
813 fn graph_node_from_record(
814 &self,
815 entity: &str,
816 record: Record,
817 ) -> Result<GraphNode, RuntimeError> {
818 let descriptor = self.repository.metadata.context.require_entity(entity)?;
819 let mut node = GraphNode::new(entity);
820
821 for (field, value) in record {
822 if field == "_comment" {
823 if let Value::Text(comment) = value {
824 node.set_comment(comment);
825 }
826 continue;
827 }
828 let Some(relation) = descriptor.relation_by_name(&field) else {
829 node.values.insert(field, value);
830 continue;
831 };
832
833 match value {
834 Value::Null => {
835 node.relations.entry(field).or_default();
836 }
837 Value::Object(record) => {
838 let child = self.graph_node_from_record(&relation.target_entity, record)?;
839 node.relations.entry(field).or_default().push(child);
840 }
841 Value::List(values) => {
842 let children = node.relations.entry(field.clone()).or_default();
843 for value in values {
844 let Value::Object(record) = value else {
845 return Err(RuntimeError::Graph(format!(
846 "relation {}.{} expects object children, got {:?}",
847 entity, field, value
848 )));
849 };
850 children
851 .push(self.graph_node_from_record(&relation.target_entity, record)?);
852 }
853 }
854 other => {
855 return Err(RuntimeError::Graph(format!(
856 "relation {}.{} expects object/list/null, got {:?}",
857 entity, field, other
858 )));
859 }
860 }
861 }
862
863 Ok(node)
864 }
865
866 fn graph_update_command(
867 &self,
868 node: &GraphNode,
869 descriptor: &EntityDescriptor,
870 id_property: &PropertyDescriptor,
871 id: &Value,
872 ) -> UpdateCommand {
873 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
874 if let Some(version_property) = descriptor.version_property() {
875 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
876 command = command.expected_version(*version);
877 }
878 }
879 for property in descriptor.properties.iter().filter(|property| {
880 !property.is_id && !property.is_version && node.values.contains_key(&property.name)
881 }) {
882 if property.name == id_property.name {
883 continue;
884 }
885 if let Some(value) = node.values.get(&property.name) {
886 command.values.insert(property.name.clone(), value.clone());
887 }
888 }
889 command
890 }
891
892 fn delete_graph_node<'s>(
893 &self,
894 node: &GraphNode,
895 parent_scope: Option<&'s ScopedCommentNode<'s>>,
896 ) -> Result<u64, RepositoryError<E::Error>> {
897 let descriptor = self
898 .repository
899 .metadata
900 .context
901 .require_entity(&node.entity)
902 .map_err(RepositoryError::Runtime)?;
903 let id_property = descriptor.id_property().ok_or_else(|| {
904 RepositoryError::Runtime(RuntimeError::Graph(format!(
905 "entity {} has no id property for graph remove",
906 node.entity
907 )))
908 })?;
909 let id = node
910 .values
911 .get(&id_property.name)
912 .filter(|value| !is_unassigned_id_value(value))
913 .cloned()
914 .ok_or_else(|| {
915 RepositoryError::Runtime(RuntimeError::Graph(format!(
916 "remove node {} missing id property {}",
917 node.entity, id_property.name
918 )))
919 })?;
920 let mut delete = DeleteCommand::new(node.entity.clone(), id);
921 if let Some(version_property) = descriptor.version_property() {
922 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
923 delete = delete.expected_version(*version);
924 }
925 }
926
927 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
929 parent: parent_scope,
930 track: CommentTrack {
931 entity_type: node.entity.clone(),
932 entity_id: node
933 .id()
934 .map(|v| match v {
935 Value::U64(n) => n.to_string(),
936 Value::I64(n) => n.to_string(),
937 Value::Text(s) => s.clone(),
938 other => format!("{other:?}"),
939 })
940 .unwrap_or_else(|| "(pending)".into()),
941 comment: c.clone(),
942 },
943 });
944 let active_scope = current_scope.as_ref().or(parent_scope);
945 let lineage = active_scope.map(|s| s.to_lineage_string());
946
947 self.delete_scoped(&delete, lineage)
948 }
949
950 fn fetch_graph_current_row(
951 &self,
952 entity: &str,
953 id_property: &str,
954 id: &Value,
955 lineage: Option<String>,
956 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
957 let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
958 if let Some(lineage) = lineage {
959 query = query.comment(lineage);
960 }
961 let mut rows = self
962 .scoped_repository(entity.to_owned())
963 .fetch_all(&query)?;
964 Ok(rows.pop())
965 }
966
967 fn fetch_graph_children(
968 &self,
969 entity: &str,
970 foreign_key: &str,
971 parent_value: &Value,
972 lineage: Option<String>,
973 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
974 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
975 if let Some(lineage) = lineage {
976 query = query.comment(lineage);
977 }
978 self.scoped_repository(entity.to_owned()).fetch_all(&query)
979 }
980}