1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6 SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10 GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11 RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21 pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22 if node.entity != self.entity {
23 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24 "resolved repository {} cannot save graph root {}",
25 self.entity, node.entity
26 ))));
27 }
28 self.upsert_graph_node_scoped(node, None).await
29 }
30
31 pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
32 fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
33 let mut relations = BTreeMap::new();
34 for (rel_name, child) in node.children {
35 relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
36 }
37 GraphNode {
38 entity: node.entity_type,
39 values: node.record,
40 relations,
41 operation: match node.operation {
42 teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
43 teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
44 },
45 comment: node.comment,
46 dirty_fields: None, original_values: None,
47 }
48 }
49 self.save_graph(convert(graph.root)).await
50 }
51
52 pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
53 where
54 T: Entity,
55 {
56 let node = self
57 .graph_node_from_entity(entity)
58 .map_err(RepositoryError::Runtime)?;
59 self.save_graph(node).await
60 }
61
62 pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
63 where
64 T: Entity,
65 {
66 if !status.need_persist() {
67 return Ok(GraphNode::new(&self.entity));
68 }
69 if status.is_deleted() {
70 let mut node = self.graph_node_from_entity(entity)
71 .map_err(RepositoryError::Runtime)?;
72 node.operation = GraphOperation::Remove;
73 node.relations.clear();
74 self.save_graph(node).await
75 } else {
76 self.save_entity_graph(entity).await
77 }
78 }
79 pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
80 where
81 T: Entity,
82 {
83 if status.is_deleted() {
84 let mut node = self.graph_node_from_entity(entity)
85 .map_err(RepositoryError::Runtime)?;
86 node.operation = GraphOperation::Remove;
87 node.relations.clear();
88 node.set_comment(comment);
89 self.save_graph(node).await
90 } else {
91 self.save_entity_graph_with_comment(entity, comment).await
92 }
93 }
94 pub async fn save_entity_graph_with_comment<T>(
95 &self,
96 entity: T,
97 comment: impl Into<String>,
98 ) -> Result<GraphNode, RepositoryError<E::Error>>
99 where
100 T: Entity,
101 {
102 let mut node = self
103 .graph_node_from_entity(entity)
104 .map_err(RepositoryError::Runtime)?;
105 node.set_comment(comment);
106 self.save_graph(node).await
107 }
108
109 pub async fn create_entity_graph_with_comment<T>(
113 &self,
114 entity: T,
115 comment: impl Into<String>,
116 ) -> Result<GraphNode, RepositoryError<E::Error>>
117 where
118 T: Entity,
119 {
120 let mut node = self
121 .graph_node_from_entity(entity)
122 .map_err(RepositoryError::Runtime)?;
123 node.operation = GraphOperation::Create;
124 node.set_comment(comment);
125 self.save_graph(node).await
126 }
127
128 pub async fn plan_graph(
129 &self,
130 node: GraphNode,
131 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
132 if node.entity != self.entity {
133 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
134 "resolved repository {} cannot plan graph root {}",
135 self.entity, node.entity
136 ))));
137 }
138 let mut node = node;
139 let mut plan = GraphMutationPlan::default();
140 self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
141 plan.planned_root = Some(node);
142 plan.rebuild_batches();
143 Ok(plan)
144 }
145
146 pub async fn execute_graph_plan(
147 &self,
148 plan: GraphMutationPlan,
149 ) -> Result<GraphNode, RepositoryError<E::Error>> {
150 let Some(root) = plan.planned_root else {
151 return Err(RepositoryError::Runtime(RuntimeError::Graph(
152 "graph mutation plan has no planned root".to_owned(),
153 )));
154 };
155
156 for batch in plan.batches {
157 if batch.items.is_empty() {
158 continue;
159 }
160 match batch.kind {
161 GraphMutationKind::Create => {
162 let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
163 for item in batch.items {
164 cmd.batch_values.push(item.values);
165 if let Some(token) = item.scope_token {
166 cmd.trace_chains.push(token.recover_trace_chain());
167 } else {
168 cmd.trace_chains.push(Vec::new());
169 }
170 }
171 self.execute_prepared_batch_insert(cmd).await?;
172 }
173 GraphMutationKind::Update => {
174 let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
175 for item in batch.items {
176 let id = item.values.get("id").cloned().ok_or_else(|| {
177 RepositoryError::Runtime(RuntimeError::Graph(format!(
178 "update item in batch missing id for {}", batch.entity
179 )))
180 })?;
181 let version = item.values.get("version").and_then(|v| {
182 if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
183 });
184 cmd.batch_values.push(item.values);
185 cmd.batch_ids.push(id);
186 cmd.batch_expected_versions.push(version);
187 cmd.batch_old_values.push(item.old_values);
188 if let Some(token) = item.scope_token {
189 cmd.trace_chains.push(token.recover_trace_chain());
190 } else {
191 cmd.trace_chains.push(Vec::new());
192 }
193 }
194 self.execute_prepared_batch_update(cmd).await?;
195 }
196 GraphMutationKind::Delete => {
197 for item in batch.items {
199 let id = item.values.get("id").cloned().ok_or_else(|| {
200 RepositoryError::Runtime(RuntimeError::Graph(format!(
201 "delete item in batch missing id for {}", batch.entity
202 )))
203 })?;
204 let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
205 if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
206 cmd = cmd.expected_version(*version);
207 }
208 let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
209 self.delete_scoped(&cmd, trace_chain).await?;
210 }
211 }
212 GraphMutationKind::Reference => {
213 }
215 }
216 }
217
218 Ok(root)
219 }
220
221 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
222 where
223 T: Entity,
224 {
225 let descriptor = T::entity_descriptor();
226 if descriptor.name != self.entity {
227 return Err(RuntimeError::Graph(format!(
228 "resolved repository {} cannot extract graph root {}",
229 self.entity, descriptor.name
230 )));
231 }
232 let dirty_fields = entity.dirty_fields();
235 let original_values = entity.original_values();
236 let is_deleted = entity.is_marked_as_delete();
237 let comment = entity.get_comment();
238 let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
239 node.dirty_fields = dirty_fields;
240 node.original_values = original_values;
241 if is_deleted {
242 node.operation = GraphOperation::Remove;
243 node.relations.clear();
244 }
245 if let Some(c) = comment {
246 node.set_comment(c);
247 }
248 Ok(node)
249 }
250
251 fn collect_graph_plan<'b, 's: 'b>(
252 &'b self,
253 node: &'b mut GraphNode,
254 plan: &'b mut GraphMutationPlan,
255 parent_scope: Option<&'s ScopedCommentNode<'s>>,
256 parent_token: Option<Arc<TraceScopeToken>>,
257 parent_is_create: bool,
258 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
259 Box::pin(async move {
260 match node.operation {
261 GraphOperation::Reference => {
262 plan.push(
263 node.entity.clone(),
264 GraphMutationKind::Reference,
265 node.values.clone(),
266 Vec::new(),
267 parent_token,
268 node.original_values.clone(),
269 );
270 return Ok(());
271 }
272 GraphOperation::Remove => {
273 plan.push(
274 node.entity.clone(),
275 GraphMutationKind::Delete,
276 node.values.clone(),
277 Vec::new(),
278 parent_token,
279 node.original_values.clone(),
280 );
281 return Ok(());
282 }
283 GraphOperation::Upsert | GraphOperation::Create => {}
284 }
285
286 let descriptor = self
287 .repository
288 .metadata
289 .context
290 .require_entity(&node.entity)
291 .map_err(RepositoryError::Runtime)?;
292
293 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
295 parent: parent_scope,
296 track: teaql_core::TraceNode {
297 entity_type: node.entity.clone(),
298 entity_id: node.id().and_then(|v| match v {
299 Value::U64(n) => Some(*n),
300 Value::I64(n) => Some(*n as u64),
301 _ => None,
302 }),
303 comment: c.clone(),
304 },
305 });
306 let active_scope = current_scope.as_ref().or(parent_scope);
307
308 let id_property = descriptor.id_property().cloned();
309 let id = id_property.as_ref().and_then(|property| {
310 node.values
311 .get(&property.name)
312 .filter(|value| !is_unassigned_id_value(value))
313 .cloned()
314 });
315
316 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
317
318 let is_update = if is_create_op {
319 false
320 } else {
321 match (id_property.as_ref(), id.as_ref()) {
322 (Some(id_property), Some(id)) => self
323 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
324 .is_some(),
325 _ => false,
326 }
327 };
328 if !is_update {
329 if let Some(id_property) = id_property.as_ref() {
330 let needs_id = !node.values.contains_key(&id_property.name)
331 || node
332 .values
333 .get(&id_property.name)
334 .is_some_and(is_unassigned_id_value);
335 if needs_id {
336 let id = self
337 .repository
338 .metadata
339 .context
340 .next_id(&node.entity)
341 .map_err(RepositoryError::Runtime)?;
342 node.values.insert(id_property.name.clone(), Value::U64(id));
343 }
344 }
345 ensure_initial_version(&mut node.values, descriptor);
346 }
347 let update_fields = if is_update {
348 let mut excluded = Vec::new();
349 if let Some(id_property) = id_property.as_ref() {
350 excluded.push(id_property.name.clone());
351 }
352 if let Some(version_property) = descriptor.version_property() {
353 excluded.push(version_property.name.clone());
354 }
355 let mut fields = sorted_update_fields(&node.values, excluded);
356 if let Some(dirty) = &node.dirty_fields {
357 fields.retain(|f| dirty.contains(f));
358 }
359 fields
360 } else {
361 Vec::new()
362 };
363
364 let current_token = if let Some(c) = &node.comment {
367 Some(Arc::new(TraceScopeToken {
368 parent: parent_token.clone(),
369 track: teaql_core::TraceNode {
370 entity_type: node.entity.clone(),
371 entity_id: node.id().and_then(|v| match v {
372 Value::U64(n) => Some(*n),
373 Value::I64(n) => Some(*n as u64),
374 _ => None,
375 }),
376 comment: c.clone(),
377 },
378 node_index: plan.next_item_index,
379 }))
380 } else {
381 parent_token.clone()
382 };
383
384 plan.push(
385 node.entity.clone(),
386 if is_update {
387 GraphMutationKind::Update
388 } else {
389 GraphMutationKind::Create
390 },
391 node.values.clone(),
392 update_fields,
393 current_token.clone(),
394 node.original_values.clone(),
395 );
396
397 for (name, children) in &mut node.relations {
398 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
399 RepositoryError::Runtime(RuntimeError::MissingRelation {
400 entity: node.entity.clone(),
401 relation: name.clone(),
402 })
403 })?;
404 let child_repo = self.scoped_repository(relation.target_entity.clone());
405 for child in children {
406 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
407 child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
408 }
409 }
410 Ok(())
411 })
412 }
413
414 fn insert_graph_node_scoped<'b, 's: 'b>(
415 &'b self,
416 mut node: GraphNode,
417 parent_scope: Option<&'s ScopedCommentNode<'s>>,
418 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
419 Box::pin(async move {
420 match node.operation {
421 GraphOperation::Upsert | GraphOperation::Create => {}
422 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
423 GraphOperation::Remove => {
424 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
425 "create graph cannot remove node {}",
426 node.entity
427 ))));
428 }
429 }
430
431 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
433 parent: parent_scope,
434 track: teaql_core::TraceNode {
435 entity_type: node.entity.clone(),
436 entity_id: node
437 .id()
438 .and_then(|v| match v {
439 Value::U64(n) => Some(*n),
440 Value::I64(n) => Some(*n as u64),
441 _ => None,
442 }),
443 comment: c.clone(),
444 },
445 });
446 let active_scope = current_scope.as_ref().or(parent_scope);
447
448 let descriptor = self
449 .repository
450 .metadata
451 .context
452 .require_entity(&node.entity)
453 .map_err(RepositoryError::Runtime)?;
454
455 let mut one_relations = Vec::new();
456 let mut many_relations = Vec::new();
457 for (name, children) in std::mem::take(&mut node.relations) {
458 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
459 RepositoryError::Runtime(RuntimeError::MissingRelation {
460 entity: node.entity.clone(),
461 relation: name.clone(),
462 })
463 })?;
464 if relation.many {
465 many_relations.push((name, relation.clone(), children));
466 } else {
467 one_relations.push((name, relation.clone(), children));
468 }
469 }
470
471 for (name, relation, children) in one_relations {
472 if children.len() > 1 {
473 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
474 "relation {}.{} expects one child, got {}",
475 node.entity,
476 name,
477 children.len()
478 ))));
479 }
480 let mut saved_children = Vec::new();
481 for child in children {
482 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
483 let child_repo = self.scoped_repository(child.entity.clone());
484 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
485 if relation.attach {
486 let foreign_value = saved_child
487 .values
488 .get(&relation.foreign_key)
489 .cloned()
490 .ok_or_else(|| {
491 RepositoryError::Runtime(RuntimeError::Graph(format!(
492 "saved child {} missing foreign key {} for relation {}.{}",
493 relation.target_entity, relation.foreign_key, node.entity, name
494 )))
495 })?;
496 node.values
497 .insert(relation.local_key.clone(), foreign_value);
498 }
499 saved_children.push(saved_child);
500 }
501 node.relations.insert(name, saved_children);
502 }
503
504 let command = self
505 .prepare_insert_command(&InsertCommand {
506 entity: node.entity.clone(),
507 values: node.values.clone(),
508 trace_chain: Vec::new(),
509 })
510 .map_err(RepositoryError::Runtime)?;
511 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
512 self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
513 node.values = command.values;
514
515 for (name, relation, children) in many_relations {
516 let local_value = node
517 .values
518 .get(&relation.local_key)
519 .cloned()
520 .ok_or_else(|| {
521 RepositoryError::Runtime(RuntimeError::Graph(format!(
522 "parent {} missing local key {} for relation {}",
523 node.entity, relation.local_key, name
524 )))
525 })?;
526 let mut saved_children = Vec::new();
527 for mut child in children {
528 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
529 if relation.attach {
530 child
531 .values
532 .insert(relation.foreign_key.clone(), local_value.clone());
533 }
534 let child_repo = self.scoped_repository(child.entity.clone());
535 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
536 }
537 node.relations.insert(name, saved_children);
538 }
539
540 Ok(node)
541 })
542 }
543
544 fn upsert_graph_node_scoped<'b, 's: 'b>(
545 &'b self,
546 mut node: GraphNode,
547 parent_scope: Option<&'s ScopedCommentNode<'s>>,
548 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
549 Box::pin(async move {
550 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
552 parent: parent_scope,
553 track: teaql_core::TraceNode {
554 entity_type: node.entity.clone(),
555 entity_id: node
556 .id()
557 .and_then(|v| match v {
558 Value::U64(n) => Some(*n),
559 Value::I64(n) => Some(*n as u64),
560 _ => None,
561 }),
562 comment: c.clone(),
563 },
564 });
565 let active_scope = current_scope.as_ref().or(parent_scope);
566
567 match node.operation {
568 GraphOperation::Upsert | GraphOperation::Create => {}
569 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
570 GraphOperation::Remove => {
571 self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
572 self.delete_graph_node(&node, parent_scope).await?;
573 return Ok(node);
574 }
575 }
576
577 let descriptor = self
578 .repository
579 .metadata
580 .context
581 .require_entity(&node.entity)
582 .map_err(RepositoryError::Runtime)?;
583 let Some(id_property) = descriptor.id_property() else {
584 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
585 "entity {} has no id property for graph upsert",
586 node.entity
587 ))));
588 };
589 let Some(id) = node
590 .values
591 .get(&id_property.name)
592 .filter(|value| !is_unassigned_id_value(value))
593 .cloned()
594 else {
595 node.comment = None;
597 return self.insert_graph_node_scoped(node, active_scope).await;
598 };
599
600 if node.operation == GraphOperation::Create || self
601 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
602 .is_none()
603 {
604 node.comment = None;
605 return self.insert_graph_node_scoped(node, active_scope).await;
606 }
607
608 let mut one_relations = Vec::new();
609 let mut many_relations = Vec::new();
610 for (name, children) in std::mem::take(&mut node.relations) {
611 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
612 RepositoryError::Runtime(RuntimeError::MissingRelation {
613 entity: node.entity.clone(),
614 relation: name.clone(),
615 })
616 })?;
617 if relation.many {
618 many_relations.push((name, relation.clone(), children));
619 } else {
620 one_relations.push((name, relation.clone(), children));
621 }
622 }
623
624 for (name, relation, children) in one_relations {
625 if children.len() > 1 {
626 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
627 "relation {}.{} expects one child, got {}",
628 node.entity,
629 name,
630 children.len()
631 ))));
632 }
633 let mut saved_children = Vec::new();
634 for child in children {
635 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
636 let child_repo = self.scoped_repository(child.entity.clone());
637 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
638 if relation.attach {
639 let foreign_value = saved_child
640 .values
641 .get(&relation.foreign_key)
642 .cloned()
643 .ok_or_else(|| {
644 RepositoryError::Runtime(RuntimeError::Graph(format!(
645 "saved child {} missing foreign key {} for relation {}.{}",
646 relation.target_entity, relation.foreign_key, node.entity, name
647 )))
648 })?;
649 node.values
650 .insert(relation.local_key.clone(), foreign_value);
651 }
652 saved_children.push(saved_child);
653 }
654 node.relations.insert(name, saved_children);
655 }
656
657 let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
658 if !update.values.is_empty() {
659 let prepared_update = self
660 .prepare_update_command(&update)
661 .map_err(RepositoryError::Runtime)?;
662 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
663 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
664 for (field, value) in &prepared_update.values {
665 node.values.insert(field.clone(), value.clone());
666 }
667 if let Some(version_property) = descriptor.version_property() {
668 if let Some(expected_version) = prepared_update.expected_version {
669 node.values.insert(
670 version_property.name.clone(),
671 Value::I64(expected_version + 1),
672 );
673 }
674 }
675 }
676
677 for (name, relation, children) in many_relations {
678 let local_value = node
679 .values
680 .get(&relation.local_key)
681 .cloned()
682 .ok_or_else(|| {
683 RepositoryError::Runtime(RuntimeError::Graph(format!(
684 "parent {} missing local key {} for relation {}",
685 node.entity, relation.local_key, name
686 )))
687 })?;
688 let child_repo = self.scoped_repository(relation.target_entity.clone());
689 let child_descriptor = self
690 .repository
691 .metadata
692 .context
693 .require_entity(&relation.target_entity)
694 .map_err(RepositoryError::Runtime)?;
695 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
696 RepositoryError::Runtime(RuntimeError::Graph(format!(
697 "entity {} has no id property",
698 relation.target_entity
699 )))
700 })?;
701
702 let mut seen = std::collections::BTreeSet::new();
703 let mut saved_children = Vec::new();
704 for mut child in children {
705 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
706 if relation.attach && child.operation != GraphOperation::Reference {
707 child
708 .values
709 .insert(relation.foreign_key.clone(), local_value.clone());
710 }
711 if let Some(child_id) = child
712 .values
713 .get(&child_id_property.name)
714 .filter(|value| !is_unassigned_id_value(value))
715 {
716 let key = graph_identity_key(child_id);
717 if !seen.insert(key.clone()) {
718 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
719 "duplicate child id {key} in relation {}.{}",
720 node.entity, name
721 ))));
722 }
723 }
724 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
725 }
726
727
728
729 node.relations.insert(name, saved_children);
730 }
731
732 Ok(node)
733 })
734 }
735
736 async fn validate_reference_node(
737 &self,
738 node: GraphNode,
739 trace_chain: Vec<teaql_core::TraceNode>,
740 ) -> Result<GraphNode, RepositoryError<E::Error>> {
741 if !node.relations.is_empty() {
742 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
743 "reference node {} cannot contain child relations",
744 node.entity
745 ))));
746 }
747 let descriptor = self
748 .repository
749 .metadata
750 .context
751 .require_entity(&node.entity)
752 .map_err(RepositoryError::Runtime)?;
753 let id_property = descriptor.id_property().ok_or_else(|| {
754 RepositoryError::Runtime(RuntimeError::Graph(format!(
755 "entity {} has no id property for graph reference",
756 node.entity
757 )))
758 })?;
759 let id = node
760 .values
761 .get(&id_property.name)
762 .filter(|value| !is_unassigned_id_value(value))
763 .cloned()
764 .ok_or_else(|| {
765 RepositoryError::Runtime(RuntimeError::Graph(format!(
766 "reference node {} missing id property {}",
767 node.entity, id_property.name
768 )))
769 })?;
770
771 for field in node.values.keys() {
772 if field == &id_property.name {
773 continue;
774 }
775 if descriptor
776 .version_property()
777 .map(|property| field == &property.name)
778 .unwrap_or(false)
779 {
780 continue;
781 }
782 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
783 "reference node {} cannot carry mutable field {}",
784 node.entity, field
785 ))));
786 }
787
788 let current = self
789 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
790 .ok_or_else(|| {
791 RepositoryError::Runtime(RuntimeError::Graph(format!(
792 "reference node {}({}) does not exist",
793 node.entity,
794 graph_identity_key(&id)
795 )))
796 })?;
797
798 if let Some(version_property) = descriptor.version_property() {
799 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
800 if *existing_version < 0 {
801 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
802 "reference node {}({}) is deleted",
803 node.entity,
804 graph_identity_key(&id)
805 ))));
806 }
807 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
808 {
809 if expected_version != existing_version {
810 return Err(RepositoryError::Runtime(
811 RuntimeError::OptimisticLockConflict {
812 entity: node.entity,
813 id: graph_identity_key(&id),
814 },
815 ));
816 }
817 }
818 }
819 }
820
821 Ok(GraphNode {
822 entity: node.entity,
823 values: current,
824 relations: BTreeMap::new(),
825 operation: GraphOperation::Reference,
826 comment: None,
827 dirty_fields: None, original_values: None,
828 })
829 }
830
831 async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
832 if !node.relations.is_empty() {
833 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
834 "remove node {} cannot contain child relations",
835 node.entity
836 ))));
837 }
838 let descriptor = self
839 .repository
840 .metadata
841 .context
842 .require_entity(&node.entity)
843 .map_err(RepositoryError::Runtime)?;
844 let id_property = descriptor.id_property().ok_or_else(|| {
845 RepositoryError::Runtime(RuntimeError::Graph(format!(
846 "entity {} has no id property for graph remove",
847 node.entity
848 )))
849 })?;
850 let id = node
851 .values
852 .get(&id_property.name)
853 .filter(|value| !is_unassigned_id_value(value))
854 .cloned()
855 .ok_or_else(|| {
856 RepositoryError::Runtime(RuntimeError::Graph(format!(
857 "remove node {} missing id property {}",
858 node.entity, id_property.name
859 )))
860 })?;
861 let current = self
862 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
863 .ok_or_else(|| {
864 RepositoryError::Runtime(RuntimeError::Graph(format!(
865 "remove node {}({}) does not exist",
866 node.entity,
867 graph_identity_key(&id)
868 )))
869 })?;
870 if let Some(version_property) = descriptor.version_property() {
871 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
872 if *existing_version < 0 {
873 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
874 "remove node {}({}) is already deleted",
875 node.entity,
876 graph_identity_key(&id)
877 ))));
878 }
879 }
880 }
881 Ok(())
882 }
883
884 fn graph_node_from_record(
885 &self,
886 entity: &str,
887 record: Record,
888 ) -> Result<GraphNode, RuntimeError> {
889 let descriptor = self.repository.metadata.context.require_entity(entity)?;
890 let mut node = GraphNode::new(entity);
891
892 for (field, value) in record {
893 if field == "_comment" {
894 if let Value::Text(comment) = value {
895 node.set_comment(comment);
896 }
897 continue;
898 }
899 let Some(relation) = descriptor.relation_by_name(&field) else {
900 node.values.insert(field, value);
901 continue;
902 };
903
904 match value {
905 Value::Null => {
906 node.relations.entry(field).or_default();
907 }
908 Value::Object(record) => {
909 let child = self.graph_node_from_record(&relation.target_entity, record)?;
910 node.relations.entry(field).or_default().push(child);
911 }
912 Value::List(values) => {
913 let children = node.relations.entry(field.clone()).or_default();
914 for value in values {
915 let Value::Object(record) = value else {
916 return Err(RuntimeError::Graph(format!(
917 "relation {}.{} expects object children, got {:?}",
918 entity, field, value
919 )));
920 };
921 children
922 .push(self.graph_node_from_record(&relation.target_entity, record)?);
923 }
924 }
925 other => {
926 return Err(RuntimeError::Graph(format!(
927 "relation {}.{} expects object/list/null, got {:?}",
928 entity, field, other
929 )));
930 }
931 }
932 }
933
934 Ok(node)
935 }
936
937 fn graph_update_command(
938 &self,
939 node: &mut GraphNode,
940 descriptor: &EntityDescriptor,
941 id_property: &PropertyDescriptor,
942 id: &Value,
943 ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
944 crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
945 let check_result = self
946 .repository
947 .metadata
948 .context
949 .check_and_fix_record(&node.entity, &mut node.values);
950 crate::clear_record_status(&mut node.values);
951 check_result.map_err(RepositoryError::Runtime)?;
952
953 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
954 command.old_values = node.original_values.clone();
955 if let Some(version_property) = descriptor.version_property() {
956 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
957 command = command.expected_version(*version);
958 }
959 }
960 for property in descriptor.properties.iter().filter(|property| {
964 !property.is_id
965 && !property.is_version
966 && property.name != id_property.name
967 && match &node.dirty_fields {
968 Some(dirty) => dirty.contains(&property.name),
969 None => node.values.contains_key(&property.name),
970 }
971 }) {
972 if let Some(value) = node.values.get(&property.name) {
973 command.values.insert(property.name.clone(), value.clone());
974 }
975 }
976 Ok(command)
977 }
978
979 fn delete_graph_node<'b, 's: 'b>(
980 &'b self,
981 node: &'b GraphNode,
982 parent_scope: Option<&'s ScopedCommentNode<'s>>,
983 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
984 Box::pin(async move {
985 let descriptor = self
986 .repository
987 .metadata
988 .context
989 .require_entity(&node.entity)
990 .map_err(RepositoryError::Runtime)?;
991 let id_property = descriptor.id_property().ok_or_else(|| {
992 RepositoryError::Runtime(RuntimeError::Graph(format!(
993 "entity {} has no id property for graph remove",
994 node.entity
995 )))
996 })?;
997 let id = node
998 .values
999 .get(&id_property.name)
1000 .filter(|value| !is_unassigned_id_value(value))
1001 .cloned()
1002 .ok_or_else(|| {
1003 RepositoryError::Runtime(RuntimeError::Graph(format!(
1004 "remove node {} missing id property {}",
1005 node.entity, id_property.name
1006 )))
1007 })?;
1008 let mut delete = DeleteCommand::new(node.entity.clone(), id);
1009 if let Some(version_property) = descriptor.version_property() {
1010 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1011 delete = delete.expected_version(*version);
1012 }
1013 }
1014
1015 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1017 parent: parent_scope,
1018 track: teaql_core::TraceNode {
1019 entity_type: node.entity.clone(),
1020 entity_id: node
1021 .id()
1022 .and_then(|v| match v {
1023 Value::U64(n) => Some(*n),
1024 Value::I64(n) => Some(*n as u64),
1025 _ => None,
1026 }),
1027 comment: c.clone(),
1028 },
1029 });
1030 let active_scope = current_scope.as_ref().or(parent_scope);
1031 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1032
1033 self.delete_scoped(&delete, lineage).await
1034 })
1035 }
1036
1037 pub async fn fetch_graph_current_row(
1038 &self,
1039 entity: &str,
1040 id_property: &str,
1041 id: &Value,
1042 trace_chain: Vec<teaql_core::TraceNode>,
1043 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1044 let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1045 query.trace_chain = trace_chain;
1046 let mut rows = self
1047 .scoped_repository(entity.to_owned())
1048 .fetch_all(&query).await?;
1049 Ok(rows.pop())
1050 }
1051
1052 async fn fetch_graph_children(
1053 &self,
1054 entity: &str,
1055 foreign_key: &str,
1056 parent_value: &Value,
1057 trace_chain: Vec<teaql_core::TraceNode>,
1058 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1059 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1060 query.trace_chain = trace_chain;
1061 self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1062 }
1063}