1use std::collections::BTreeMap;
2
3use teaql_core::{
4 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
5 SelectQuery, UpdateCommand, Value,
6};
7use teaql_sql::SqlDialect;
8
9use crate::{
10 CommentTrack, GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11 RuntimeError, ScopedCommentNode, sorted_update_fields,
12};
13
14use super::{GraphTransactionBoundary, QueryExecutor, ResolvedRepository, helpers::*};
15
16impl<'a, D, E> ResolvedRepository<'a, D, E>
17where
18 D: SqlDialect,
19 E: QueryExecutor,
20{
21 pub fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22 if node.entity != self.entity {
23 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24 "resolved repository {} cannot save graph root {}",
25 self.entity, node.entity
26 ))));
27 }
28 let boundary = self
29 .repository
30 .executor
31 .begin_transaction()
32 .map_err(RepositoryError::Executor)?;
33 if matches!(boundary, GraphTransactionBoundary::Unsupported) {
34 return Err(RepositoryError::Runtime(RuntimeError::Graph(
35 "save_graph requires a transactional executor".to_owned(),
36 )));
37 }
38 let result = self.upsert_graph_node_scoped(node, None);
39 match result {
40 Ok(saved) => {
41 if matches!(boundary, GraphTransactionBoundary::Started) {
42 self.repository
43 .executor
44 .commit_transaction()
45 .map_err(RepositoryError::Executor)?;
46 }
47 Ok(saved)
48 }
49 Err(err) => {
50 if !matches!(boundary, GraphTransactionBoundary::Unsupported) {
51 self.repository
52 .executor
53 .rollback_transaction()
54 .map_err(RepositoryError::Executor)?;
55 }
56 Err(err)
57 }
58 }
59 }
60
61
62 pub fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
63 where
64 T: Entity,
65 {
66 let node = self
67 .graph_node_from_entity(entity)
68 .map_err(RepositoryError::Runtime)?;
69 self.save_graph(node)
70 }
71
72 pub fn save_entity_graph_with_comment<T>(
73 &self,
74 entity: T,
75 comment: impl Into<String>,
76 ) -> Result<GraphNode, RepositoryError<E::Error>>
77 where
78 T: Entity,
79 {
80 let mut node = self
81 .graph_node_from_entity(entity)
82 .map_err(RepositoryError::Runtime)?;
83 node.set_comment(comment);
84 self.save_graph(node)
85 }
86
87 pub fn create_entity_graph_with_comment<T>(
91 &self,
92 entity: T,
93 comment: impl Into<String>,
94 ) -> Result<GraphNode, RepositoryError<E::Error>>
95 where
96 T: Entity,
97 {
98 let mut node = self
99 .graph_node_from_entity(entity)
100 .map_err(RepositoryError::Runtime)?;
101 node.operation = GraphOperation::Create;
102 node.set_comment(comment);
103 self.save_graph(node)
104 }
105
106 pub fn plan_graph(
107 &self,
108 node: GraphNode,
109 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
110 if node.entity != self.entity {
111 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
112 "resolved repository {} cannot plan graph root {}",
113 self.entity, node.entity
114 ))));
115 }
116 let mut node = node;
117 let mut plan = GraphMutationPlan::default();
118 self.collect_graph_plan(&mut node, &mut plan, None, false)?;
119 plan.planned_root = Some(node);
120 plan.rebuild_batches();
121 Ok(plan)
122 }
123
124 pub fn execute_graph_plan(
125 &self,
126 plan: GraphMutationPlan,
127 ) -> Result<GraphNode, RepositoryError<E::Error>> {
128 let Some(root) = plan.planned_root else {
129 return Err(RepositoryError::Runtime(RuntimeError::Graph(
130 "graph mutation plan has no planned root".to_owned(),
131 )));
132 };
133
134 self.upsert_graph_node_scoped(root, None)
135 }
136
137 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
138 where
139 T: Entity,
140 {
141 let descriptor = T::entity_descriptor();
142 if descriptor.name != self.entity {
143 return Err(RuntimeError::Graph(format!(
144 "resolved repository {} cannot extract graph root {}",
145 self.entity, descriptor.name
146 )));
147 }
148 self.graph_node_from_record(&descriptor.name, entity.into_record())
149 }
150
151 fn collect_graph_plan<'s>(
152 &self,
153 node: &mut GraphNode,
154 plan: &mut GraphMutationPlan,
155 parent_scope: Option<&'s ScopedCommentNode<'s>>,
156 parent_is_create: bool,
157 ) -> Result<(), RepositoryError<E::Error>> {
158 match node.operation {
159 GraphOperation::Reference => {
160 plan.push(
161 node.entity.clone(),
162 GraphMutationKind::Reference,
163 node.values.clone(),
164 Vec::new(),
165 );
166 return Ok(());
167 }
168 GraphOperation::Remove => {
169 plan.push(
170 node.entity.clone(),
171 GraphMutationKind::Delete,
172 node.values.clone(),
173 Vec::new(),
174 );
175 return Ok(());
176 }
177 GraphOperation::Upsert | GraphOperation::Create => {}
178 }
179
180 let descriptor = self
181 .repository
182 .metadata
183 .context
184 .require_entity(&node.entity)
185 .map_err(RepositoryError::Runtime)?;
186
187 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
189 parent: parent_scope,
190 track: CommentTrack {
191 entity_type: node.entity.clone(),
192 entity_id: node
193 .id()
194 .map(|v| match v {
195 Value::U64(n) => n.to_string(),
196 Value::I64(n) => n.to_string(),
197 Value::Text(s) => s.clone(),
198 other => format!("{other:?}"),
199 })
200 .unwrap_or_else(|| "(pending)".into()),
201 comment: c.clone(),
202 },
203 });
204 let active_scope = current_scope.as_ref().or(parent_scope);
205
206 let id_property = descriptor.id_property().cloned();
207 let id = id_property.as_ref().and_then(|property| {
208 node.values
209 .get(&property.name)
210 .filter(|value| !is_unassigned_id_value(value))
211 .cloned()
212 });
213
214 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
215
216 let is_update = if is_create_op {
217 false
218 } else {
219 match (id_property.as_ref(), id.as_ref()) {
220 (Some(id_property), Some(id)) => self
221 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_lineage_string()))?
222 .is_some(),
223 _ => false,
224 }
225 };
226 if !is_update {
227 if let Some(id_property) = id_property.as_ref() {
228 let needs_id = !node.values.contains_key(&id_property.name)
229 || node
230 .values
231 .get(&id_property.name)
232 .is_some_and(is_unassigned_id_value);
233 if needs_id {
234 let id = self
235 .repository
236 .metadata
237 .context
238 .next_id(&node.entity)
239 .map_err(RepositoryError::Runtime)?;
240 node.values.insert(id_property.name.clone(), Value::U64(id));
241 }
242 }
243 ensure_initial_version(&mut node.values, descriptor);
244 }
245 let update_fields = if is_update {
246 let mut excluded = Vec::new();
247 if let Some(id_property) = id_property.as_ref() {
248 excluded.push(id_property.name.clone());
249 }
250 if let Some(version_property) = descriptor.version_property() {
251 excluded.push(version_property.name.clone());
252 }
253 sorted_update_fields(&node.values, excluded)
254 } else {
255 Vec::new()
256 };
257 plan.push(
258 node.entity.clone(),
259 if is_update {
260 GraphMutationKind::Update
261 } else {
262 GraphMutationKind::Create
263 },
264 node.values.clone(),
265 update_fields,
266 );
267
268 for (name, children) in &mut node.relations {
269 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
270 RepositoryError::Runtime(RuntimeError::MissingRelation {
271 entity: node.entity.clone(),
272 relation: name.clone(),
273 })
274 })?;
275 let child_repo = self.scoped_repository(relation.target_entity.clone());
276 for child in children {
277 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
278 child_repo.collect_graph_plan(child, plan, active_scope, is_create_op)?;
279 }
280 }
281 Ok(())
282 }
283
284 fn insert_graph_node_scoped<'s>(
285 &self,
286 mut node: GraphNode,
287 parent_scope: Option<&'s ScopedCommentNode<'s>>,
288 ) -> Result<GraphNode, RepositoryError<E::Error>> {
289 match node.operation {
290 GraphOperation::Upsert | GraphOperation::Create => {}
291 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_lineage_string())),
292 GraphOperation::Remove => {
293 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
294 "create graph cannot remove node {}",
295 node.entity
296 ))));
297 }
298 }
299
300 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
302 parent: parent_scope,
303 track: CommentTrack {
304 entity_type: node.entity.clone(),
305 entity_id: node
306 .id()
307 .map(|v| match v {
308 Value::U64(n) => n.to_string(),
309 Value::I64(n) => n.to_string(),
310 Value::Text(s) => s.clone(),
311 other => format!("{other:?}"),
312 })
313 .unwrap_or_else(|| "(pending)".into()),
314 comment: c.clone(),
315 },
316 });
317 let active_scope = current_scope.as_ref().or(parent_scope);
318
319 let descriptor = self
320 .repository
321 .metadata
322 .context
323 .require_entity(&node.entity)
324 .map_err(RepositoryError::Runtime)?;
325
326 let mut one_relations = Vec::new();
327 let mut many_relations = Vec::new();
328 for (name, children) in std::mem::take(&mut node.relations) {
329 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
330 RepositoryError::Runtime(RuntimeError::MissingRelation {
331 entity: node.entity.clone(),
332 relation: name.clone(),
333 })
334 })?;
335 if relation.many {
336 many_relations.push((name, relation.clone(), children));
337 } else {
338 one_relations.push((name, relation.clone(), children));
339 }
340 }
341
342 for (name, relation, children) in one_relations {
343 if children.len() > 1 {
344 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
345 "relation {}.{} expects one child, got {}",
346 node.entity,
347 name,
348 children.len()
349 ))));
350 }
351 let mut saved_children = Vec::new();
352 for child in children {
353 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
354 let child_repo = self.scoped_repository(child.entity.clone());
355 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope)?;
356 if relation.attach {
357 let foreign_value = saved_child
358 .values
359 .get(&relation.foreign_key)
360 .cloned()
361 .ok_or_else(|| {
362 RepositoryError::Runtime(RuntimeError::Graph(format!(
363 "saved child {} missing foreign key {} for relation {}.{}",
364 relation.target_entity, relation.foreign_key, node.entity, name
365 )))
366 })?;
367 node.values
368 .insert(relation.local_key.clone(), foreign_value);
369 }
370 saved_children.push(saved_child);
371 }
372 node.relations.insert(name, saved_children);
373 }
374
375 let command = self
376 .prepare_insert_command(&InsertCommand {
377 entity: node.entity.clone(),
378 values: node.values.clone(),
379 })
380 .map_err(RepositoryError::Runtime)?;
381 let lineage = active_scope.map(|s| s.to_lineage_string());
382 self.execute_prepared_insert_with_comment(command.clone(), lineage)?;
383 node.values = command.values;
384
385 for (name, relation, children) in many_relations {
386 let local_value = node
387 .values
388 .get(&relation.local_key)
389 .cloned()
390 .ok_or_else(|| {
391 RepositoryError::Runtime(RuntimeError::Graph(format!(
392 "parent {} missing local key {} for relation {}",
393 node.entity, relation.local_key, name
394 )))
395 })?;
396 let mut saved_children = Vec::new();
397 for mut child in children {
398 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
399 if relation.attach {
400 child
401 .values
402 .insert(relation.foreign_key.clone(), local_value.clone());
403 }
404 let child_repo = self.scoped_repository(child.entity.clone());
405 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope)?);
406 }
407 node.relations.insert(name, saved_children);
408 }
409
410 Ok(node)
411 }
412
413 fn upsert_graph_node_scoped<'s>(
414 &self,
415 mut node: GraphNode,
416 parent_scope: Option<&'s ScopedCommentNode<'s>>,
417 ) -> Result<GraphNode, RepositoryError<E::Error>> {
418 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
420 parent: parent_scope,
421 track: CommentTrack {
422 entity_type: node.entity.clone(),
423 entity_id: node
424 .id()
425 .map(|v| match v {
426 Value::U64(n) => n.to_string(),
427 Value::I64(n) => n.to_string(),
428 Value::Text(s) => s.clone(),
429 other => format!("{other:?}"),
430 })
431 .unwrap_or_else(|| "(pending)".into()),
432 comment: c.clone(),
433 },
434 });
435 let active_scope = current_scope.as_ref().or(parent_scope);
436
437 match node.operation {
438 GraphOperation::Upsert | GraphOperation::Create => {}
439 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_lineage_string())),
440 GraphOperation::Remove => {
441 self.validate_remove_node(&node, active_scope.map(|s| s.to_lineage_string()))?;
442 self.delete_graph_node(&node, parent_scope)?;
443 return Ok(node);
444 }
445 }
446
447 let descriptor = self
448 .repository
449 .metadata
450 .context
451 .require_entity(&node.entity)
452 .map_err(RepositoryError::Runtime)?;
453 let Some(id_property) = descriptor.id_property() else {
454 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
455 "entity {} has no id property for graph upsert",
456 node.entity
457 ))));
458 };
459 let Some(id) = node
460 .values
461 .get(&id_property.name)
462 .filter(|value| !is_unassigned_id_value(value))
463 .cloned()
464 else {
465 node.comment = None;
467 return self.insert_graph_node_scoped(node, active_scope);
468 };
469
470 if node.operation == GraphOperation::Create || self
471 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_lineage_string()))?
472 .is_none()
473 {
474 node.comment = None;
475 return self.insert_graph_node_scoped(node, active_scope);
476 }
477
478 let mut one_relations = Vec::new();
479 let mut many_relations = Vec::new();
480 for (name, children) in std::mem::take(&mut node.relations) {
481 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
482 RepositoryError::Runtime(RuntimeError::MissingRelation {
483 entity: node.entity.clone(),
484 relation: name.clone(),
485 })
486 })?;
487 if relation.many {
488 many_relations.push((name, relation.clone(), children));
489 } else {
490 one_relations.push((name, relation.clone(), children));
491 }
492 }
493
494 for (name, relation, children) in one_relations {
495 if children.len() > 1 {
496 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
497 "relation {}.{} expects one child, got {}",
498 node.entity,
499 name,
500 children.len()
501 ))));
502 }
503 let mut saved_children = Vec::new();
504 for child in children {
505 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
506 let child_repo = self.scoped_repository(child.entity.clone());
507 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope)?;
508 if relation.attach {
509 let foreign_value = saved_child
510 .values
511 .get(&relation.foreign_key)
512 .cloned()
513 .ok_or_else(|| {
514 RepositoryError::Runtime(RuntimeError::Graph(format!(
515 "saved child {} missing foreign key {} for relation {}.{}",
516 relation.target_entity, relation.foreign_key, node.entity, name
517 )))
518 })?;
519 node.values
520 .insert(relation.local_key.clone(), foreign_value);
521 }
522 saved_children.push(saved_child);
523 }
524 node.relations.insert(name, saved_children);
525 }
526
527 let update = self.graph_update_command(&node, descriptor, id_property, &id);
528 if !update.values.is_empty() || update.expected_version.is_some() {
529 let prepared_update = self
530 .prepare_update_command(&update)
531 .map_err(RepositoryError::Runtime)?;
532 let lineage = active_scope.map(|s| s.to_lineage_string());
533 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage)?;
534 for (field, value) in &prepared_update.values {
535 node.values.insert(field.clone(), value.clone());
536 }
537 if let Some(version_property) = descriptor.version_property() {
538 if let Some(expected_version) = prepared_update.expected_version {
539 node.values.insert(
540 version_property.name.clone(),
541 Value::I64(expected_version + 1),
542 );
543 }
544 }
545 }
546
547 for (name, relation, children) in many_relations {
548 let local_value = node
549 .values
550 .get(&relation.local_key)
551 .cloned()
552 .ok_or_else(|| {
553 RepositoryError::Runtime(RuntimeError::Graph(format!(
554 "parent {} missing local key {} for relation {}",
555 node.entity, relation.local_key, name
556 )))
557 })?;
558 let child_repo = self.scoped_repository(relation.target_entity.clone());
559 let existing_children = child_repo.fetch_graph_children(
560 &relation.target_entity,
561 &relation.foreign_key,
562 &local_value,
563 active_scope.map(|s| s.to_lineage_string()),
564 )?;
565 let child_descriptor = self
566 .repository
567 .metadata
568 .context
569 .require_entity(&relation.target_entity)
570 .map_err(RepositoryError::Runtime)?;
571 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
572 RepositoryError::Runtime(RuntimeError::Graph(format!(
573 "entity {} has no id property for graph diff",
574 relation.target_entity
575 )))
576 })?;
577 let mut existing_by_id = BTreeMap::new();
578 for child in existing_children {
579 if let Some(id) = child.get(&child_id_property.name) {
580 existing_by_id.insert(graph_identity_key(id), child);
581 }
582 }
583
584 let mut seen = std::collections::BTreeSet::new();
585 let mut saved_children = Vec::new();
586 for mut child in children {
587 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
588 if relation.attach && child.operation != GraphOperation::Reference {
589 child
590 .values
591 .insert(relation.foreign_key.clone(), local_value.clone());
592 }
593 if let Some(child_id) = child
594 .values
595 .get(&child_id_property.name)
596 .filter(|value| !is_unassigned_id_value(value))
597 {
598 let key = graph_identity_key(child_id);
599 if !seen.insert(key.clone()) {
600 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
601 "duplicate child id {key} in relation {}.{}",
602 node.entity, name
603 ))));
604 }
605 }
606 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope)?);
607 }
608
609 if relation.delete_missing {
610 for (id_key, existing) in existing_by_id {
611 if seen.contains(&id_key) {
612 continue;
613 }
614 let Some(existing_id) = existing.get(&child_id_property.name).cloned() else {
615 continue;
616 };
617 let mut delete =
618 DeleteCommand::new(relation.target_entity.clone(), existing_id);
619 if let Some(version) = graph_record_version(&existing, child_descriptor) {
620 delete = delete.expected_version(version);
621 }
622 let lineage = active_scope.map(|s| s.to_lineage_string());
623 child_repo.delete_scoped(&delete, lineage)?;
624 }
625 }
626
627 node.relations.insert(name, saved_children);
628 }
629
630 Ok(node)
631 }
632
633 fn validate_reference_node(
634 &self,
635 node: GraphNode,
636 lineage: Option<String>,
637 ) -> Result<GraphNode, RepositoryError<E::Error>> {
638 if !node.relations.is_empty() {
639 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
640 "reference node {} cannot contain child relations",
641 node.entity
642 ))));
643 }
644 let descriptor = self
645 .repository
646 .metadata
647 .context
648 .require_entity(&node.entity)
649 .map_err(RepositoryError::Runtime)?;
650 let id_property = descriptor.id_property().ok_or_else(|| {
651 RepositoryError::Runtime(RuntimeError::Graph(format!(
652 "entity {} has no id property for graph reference",
653 node.entity
654 )))
655 })?;
656 let id = node
657 .values
658 .get(&id_property.name)
659 .filter(|value| !is_unassigned_id_value(value))
660 .cloned()
661 .ok_or_else(|| {
662 RepositoryError::Runtime(RuntimeError::Graph(format!(
663 "reference node {} missing id property {}",
664 node.entity, id_property.name
665 )))
666 })?;
667
668 for field in node.values.keys() {
669 if field == &id_property.name {
670 continue;
671 }
672 if descriptor
673 .version_property()
674 .map(|property| field == &property.name)
675 .unwrap_or(false)
676 {
677 continue;
678 }
679 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
680 "reference node {} cannot carry mutable field {}",
681 node.entity, field
682 ))));
683 }
684
685 let current = self
686 .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
687 .ok_or_else(|| {
688 RepositoryError::Runtime(RuntimeError::Graph(format!(
689 "reference node {}({}) does not exist",
690 node.entity,
691 graph_identity_key(&id)
692 )))
693 })?;
694
695 if let Some(version_property) = descriptor.version_property() {
696 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
697 if *existing_version < 0 {
698 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
699 "reference node {}({}) is deleted",
700 node.entity,
701 graph_identity_key(&id)
702 ))));
703 }
704 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
705 {
706 if expected_version != existing_version {
707 return Err(RepositoryError::Runtime(
708 RuntimeError::OptimisticLockConflict {
709 entity: node.entity,
710 id: graph_identity_key(&id),
711 },
712 ));
713 }
714 }
715 }
716 }
717
718 Ok(GraphNode {
719 entity: node.entity,
720 values: current,
721 relations: BTreeMap::new(),
722 operation: GraphOperation::Reference,
723 comment: None,
724 })
725 }
726
727 fn validate_remove_node(&self, node: &GraphNode, lineage: Option<String>) -> Result<(), RepositoryError<E::Error>> {
728 if !node.relations.is_empty() {
729 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
730 "remove node {} cannot contain child relations",
731 node.entity
732 ))));
733 }
734 let descriptor = self
735 .repository
736 .metadata
737 .context
738 .require_entity(&node.entity)
739 .map_err(RepositoryError::Runtime)?;
740 let id_property = descriptor.id_property().ok_or_else(|| {
741 RepositoryError::Runtime(RuntimeError::Graph(format!(
742 "entity {} has no id property for graph remove",
743 node.entity
744 )))
745 })?;
746 let id = node
747 .values
748 .get(&id_property.name)
749 .filter(|value| !is_unassigned_id_value(value))
750 .cloned()
751 .ok_or_else(|| {
752 RepositoryError::Runtime(RuntimeError::Graph(format!(
753 "remove node {} missing id property {}",
754 node.entity, id_property.name
755 )))
756 })?;
757 let current = self
758 .fetch_graph_current_row(&node.entity, &id_property.name, &id, lineage)?
759 .ok_or_else(|| {
760 RepositoryError::Runtime(RuntimeError::Graph(format!(
761 "remove node {}({}) does not exist",
762 node.entity,
763 graph_identity_key(&id)
764 )))
765 })?;
766 if let Some(version_property) = descriptor.version_property() {
767 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
768 if *existing_version < 0 {
769 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
770 "remove node {}({}) is already deleted",
771 node.entity,
772 graph_identity_key(&id)
773 ))));
774 }
775 }
776 }
777 Ok(())
778 }
779
780 fn graph_node_from_record(
781 &self,
782 entity: &str,
783 record: Record,
784 ) -> Result<GraphNode, RuntimeError> {
785 let descriptor = self.repository.metadata.context.require_entity(entity)?;
786 let mut node = GraphNode::new(entity);
787
788 for (field, value) in record {
789 if field == "_comment" {
790 if let Value::Text(comment) = value {
791 node.set_comment(comment);
792 }
793 continue;
794 }
795 let Some(relation) = descriptor.relation_by_name(&field) else {
796 node.values.insert(field, value);
797 continue;
798 };
799
800 match value {
801 Value::Null => {
802 node.relations.entry(field).or_default();
803 }
804 Value::Object(record) => {
805 let child = self.graph_node_from_record(&relation.target_entity, record)?;
806 node.relations.entry(field).or_default().push(child);
807 }
808 Value::List(values) => {
809 let children = node.relations.entry(field.clone()).or_default();
810 for value in values {
811 let Value::Object(record) = value else {
812 return Err(RuntimeError::Graph(format!(
813 "relation {}.{} expects object children, got {:?}",
814 entity, field, value
815 )));
816 };
817 children
818 .push(self.graph_node_from_record(&relation.target_entity, record)?);
819 }
820 }
821 other => {
822 return Err(RuntimeError::Graph(format!(
823 "relation {}.{} expects object/list/null, got {:?}",
824 entity, field, other
825 )));
826 }
827 }
828 }
829
830 Ok(node)
831 }
832
833 fn graph_update_command(
834 &self,
835 node: &GraphNode,
836 descriptor: &EntityDescriptor,
837 id_property: &PropertyDescriptor,
838 id: &Value,
839 ) -> UpdateCommand {
840 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
841 if let Some(version_property) = descriptor.version_property() {
842 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
843 command = command.expected_version(*version);
844 }
845 }
846 for property in descriptor.properties.iter().filter(|property| {
847 !property.is_id && !property.is_version && node.values.contains_key(&property.name)
848 }) {
849 if property.name == id_property.name {
850 continue;
851 }
852 if let Some(value) = node.values.get(&property.name) {
853 command.values.insert(property.name.clone(), value.clone());
854 }
855 }
856 command
857 }
858
859 fn delete_graph_node<'s>(
860 &self,
861 node: &GraphNode,
862 parent_scope: Option<&'s ScopedCommentNode<'s>>,
863 ) -> Result<u64, RepositoryError<E::Error>> {
864 let descriptor = self
865 .repository
866 .metadata
867 .context
868 .require_entity(&node.entity)
869 .map_err(RepositoryError::Runtime)?;
870 let id_property = descriptor.id_property().ok_or_else(|| {
871 RepositoryError::Runtime(RuntimeError::Graph(format!(
872 "entity {} has no id property for graph remove",
873 node.entity
874 )))
875 })?;
876 let id = node
877 .values
878 .get(&id_property.name)
879 .filter(|value| !is_unassigned_id_value(value))
880 .cloned()
881 .ok_or_else(|| {
882 RepositoryError::Runtime(RuntimeError::Graph(format!(
883 "remove node {} missing id property {}",
884 node.entity, id_property.name
885 )))
886 })?;
887 let mut delete = DeleteCommand::new(node.entity.clone(), id);
888 if let Some(version_property) = descriptor.version_property() {
889 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
890 delete = delete.expected_version(*version);
891 }
892 }
893
894 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
896 parent: parent_scope,
897 track: CommentTrack {
898 entity_type: node.entity.clone(),
899 entity_id: node
900 .id()
901 .map(|v| match v {
902 Value::U64(n) => n.to_string(),
903 Value::I64(n) => n.to_string(),
904 Value::Text(s) => s.clone(),
905 other => format!("{other:?}"),
906 })
907 .unwrap_or_else(|| "(pending)".into()),
908 comment: c.clone(),
909 },
910 });
911 let active_scope = current_scope.as_ref().or(parent_scope);
912 let lineage = active_scope.map(|s| s.to_lineage_string());
913
914 self.delete_scoped(&delete, lineage)
915 }
916
917 fn fetch_graph_current_row(
918 &self,
919 entity: &str,
920 id_property: &str,
921 id: &Value,
922 lineage: Option<String>,
923 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
924 let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
925 if let Some(lineage) = lineage {
926 query = query.comment(lineage);
927 }
928 let mut rows = self
929 .scoped_repository(entity.to_owned())
930 .fetch_all(&query)?;
931 Ok(rows.pop())
932 }
933
934 fn fetch_graph_children(
935 &self,
936 entity: &str,
937 foreign_key: &str,
938 parent_value: &Value,
939 lineage: Option<String>,
940 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
941 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
942 if let Some(lineage) = lineage {
943 query = query.comment(lineage);
944 }
945 self.scoped_repository(entity.to_owned()).fetch_all(&query)
946 }
947}