1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6 SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10 GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11 RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21 pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22 if node.entity != self.entity {
23 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24 "resolved repository {} cannot save graph root {}",
25 self.entity, node.entity
26 ))));
27 }
28 self.upsert_graph_node_scoped(node, None).await
29 }
30
31 pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
32 fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
33 let mut relations = BTreeMap::new();
34 for (rel_name, child) in node.children {
35 relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
36 }
37 GraphNode {
38 entity: node.entity_type,
39 values: node.record,
40 relations,
41 operation: match node.operation {
42 teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
43 teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
44 },
45 comment: node.comment,
46 dirty_fields: None, original_values: None,
47 }
48 }
49 self.save_graph(convert(graph.root)).await
50 }
51
52 pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
53 where
54 T: Entity,
55 {
56 let node = self
57 .graph_node_from_entity(entity)
58 .map_err(RepositoryError::Runtime)?;
59 self.save_graph(node).await
60 }
61
62 pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
63 where
64 T: Entity,
65 {
66 if !status.need_persist() {
67 return Ok(GraphNode::new(&self.entity));
68 }
69 if status.is_deleted() {
70 let mut node = self.graph_node_from_entity(entity)
71 .map_err(RepositoryError::Runtime)?;
72 node.operation = GraphOperation::Remove;
73 node.relations.clear();
74 self.save_graph(node).await
75 } else {
76 self.save_entity_graph(entity).await
77 }
78 }
79 pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
80 where
81 T: Entity,
82 {
83 if status.is_deleted() {
84 let mut node = self.graph_node_from_entity(entity)
85 .map_err(RepositoryError::Runtime)?;
86 node.operation = GraphOperation::Remove;
87 node.relations.clear();
88 node.set_comment(comment);
89 self.save_graph(node).await
90 } else {
91 self.save_entity_graph_with_comment(entity, comment).await
92 }
93 }
94 pub async fn save_entity_graph_with_comment<T>(
95 &self,
96 entity: T,
97 comment: impl Into<String>,
98 ) -> Result<GraphNode, RepositoryError<E::Error>>
99 where
100 T: Entity,
101 {
102 let mut node = self
103 .graph_node_from_entity(entity)
104 .map_err(RepositoryError::Runtime)?;
105 node.set_comment(comment);
106 self.save_graph(node).await
107 }
108
109 pub async fn create_entity_graph_with_comment<T>(
113 &self,
114 entity: T,
115 comment: impl Into<String>,
116 ) -> Result<GraphNode, RepositoryError<E::Error>>
117 where
118 T: Entity,
119 {
120 let mut node = self
121 .graph_node_from_entity(entity)
122 .map_err(RepositoryError::Runtime)?;
123 node.operation = GraphOperation::Create;
124 node.set_comment(comment);
125 self.save_graph(node).await
126 }
127
128 pub async fn plan_graph(
129 &self,
130 node: GraphNode,
131 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
132 if node.entity != self.entity {
133 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
134 "resolved repository {} cannot plan graph root {}",
135 self.entity, node.entity
136 ))));
137 }
138 let mut node = node;
139 let mut plan = GraphMutationPlan::default();
140 self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
141 plan.planned_root = Some(node);
142 plan.rebuild_batches();
143 Ok(plan)
144 }
145
146 pub async fn execute_graph_plan(
147 &self,
148 plan: GraphMutationPlan,
149 ) -> Result<GraphNode, RepositoryError<E::Error>> {
150 let Some(root) = plan.planned_root else {
151 return Err(RepositoryError::Runtime(RuntimeError::Graph(
152 "graph mutation plan has no planned root".to_owned(),
153 )));
154 };
155
156 for batch in plan.batches {
157 if batch.items.is_empty() {
158 continue;
159 }
160 match batch.kind {
161 GraphMutationKind::Create => {
162 let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
163 for item in batch.items {
164 cmd.batch_values.push(item.values);
165 if let Some(token) = item.scope_token {
166 cmd.trace_chains.push(token.recover_trace_chain());
167 } else {
168 cmd.trace_chains.push(Vec::new());
169 }
170 }
171 self.execute_prepared_batch_insert(cmd).await?;
172 }
173 GraphMutationKind::Update => {
174 let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
175 for item in batch.items {
176 let id = item.values.get("id").cloned().ok_or_else(|| {
177 RepositoryError::Runtime(RuntimeError::Graph(format!(
178 "update item in batch missing id for {}", batch.entity
179 )))
180 })?;
181 let version = item.values.get("version").and_then(|v| {
182 if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
183 });
184 cmd.batch_values.push(item.values);
185 cmd.batch_ids.push(id);
186 cmd.batch_expected_versions.push(version);
187 cmd.batch_old_values.push(item.old_values);
188 if let Some(token) = item.scope_token {
189 cmd.trace_chains.push(token.recover_trace_chain());
190 } else {
191 cmd.trace_chains.push(Vec::new());
192 }
193 }
194 self.execute_prepared_batch_update(cmd).await?;
195 }
196 GraphMutationKind::Delete => {
197 for item in batch.items {
199 let id = item.values.get("id").cloned().ok_or_else(|| {
200 RepositoryError::Runtime(RuntimeError::Graph(format!(
201 "delete item in batch missing id for {}", batch.entity
202 )))
203 })?;
204 let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
205 if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
206 cmd = cmd.expected_version(*version);
207 }
208 let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
209 self.delete_scoped(&cmd, trace_chain).await?;
210 }
211 }
212 GraphMutationKind::Reference => {
213 }
215 }
216 }
217
218 Ok(root)
219 }
220
221 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
222 where
223 T: Entity,
224 {
225 let descriptor = T::entity_descriptor();
226 if descriptor.name != self.entity {
227 return Err(RuntimeError::Graph(format!(
228 "resolved repository {} cannot extract graph root {}",
229 self.entity, descriptor.name
230 )));
231 }
232 let dirty_fields = entity.dirty_fields();
235 let original_values = entity.original_values();
236 let is_deleted = entity.is_marked_as_delete();
237 let comment = entity.get_comment();
238 let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
239 node.dirty_fields = dirty_fields;
240 node.original_values = original_values;
241 if is_deleted {
242 node.operation = GraphOperation::Remove;
243 node.relations.clear();
244 }
245 if let Some(c) = comment {
246 node.set_comment(c);
247 }
248 Ok(node)
249 }
250
251 fn collect_graph_plan<'b, 's: 'b>(
252 &'b self,
253 node: &'b mut GraphNode,
254 plan: &'b mut GraphMutationPlan,
255 parent_scope: Option<&'s ScopedCommentNode<'s>>,
256 parent_token: Option<Arc<TraceScopeToken>>,
257 parent_is_create: bool,
258 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
259 Box::pin(async move {
260 match node.operation {
261 GraphOperation::Reference => {
262 plan.push(
263 node.entity.clone(),
264 GraphMutationKind::Reference,
265 node.values.clone(),
266 Vec::new(),
267 parent_token,
268 node.original_values.clone(),
269 );
270 return Ok(());
271 }
272 GraphOperation::Remove => {
273 plan.push(
274 node.entity.clone(),
275 GraphMutationKind::Delete,
276 node.values.clone(),
277 Vec::new(),
278 parent_token,
279 node.original_values.clone(),
280 );
281 return Ok(());
282 }
283 GraphOperation::Upsert | GraphOperation::Create => {}
284 }
285
286 let descriptor = self
287 .repository
288 .metadata
289 .context
290 .require_entity(&node.entity)
291 .map_err(RepositoryError::Runtime)?;
292
293 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
295 parent: parent_scope,
296 track: teaql_core::TraceNode {
297 entity_type: node.entity.clone(),
298 entity_id: node.id().and_then(|v| match v {
299 Value::U64(n) => Some(*n),
300 Value::I64(n) => Some(*n as u64),
301 _ => None,
302 }),
303 comment: c.clone(),
304 },
305 });
306 let active_scope = current_scope.as_ref().or(parent_scope);
307
308 let id_property = descriptor.id_property().cloned();
309 let id = id_property.as_ref().and_then(|property| {
310 node.values
311 .get(&property.name)
312 .filter(|value| !is_unassigned_id_value(value))
313 .cloned()
314 });
315
316 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
317
318 let is_update = if is_create_op {
319 false
320 } else {
321 match (id_property.as_ref(), id.as_ref()) {
322 (Some(id_property), Some(id)) => self
323 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
324 .is_some(),
325 _ => false,
326 }
327 };
328 if !is_update {
329 if let Some(id_property) = id_property.as_ref() {
330 let needs_id = !node.values.contains_key(&id_property.name)
331 || node
332 .values
333 .get(&id_property.name)
334 .is_some_and(is_unassigned_id_value);
335 if needs_id {
336 let id = self
337 .repository
338 .metadata
339 .context
340 .next_id(&node.entity)
341 .map_err(RepositoryError::Runtime)?;
342 node.values.insert(id_property.name.clone(), Value::U64(id));
343 }
344 }
345 ensure_initial_version(&mut node.values, descriptor);
346 }
347 let update_fields = if is_update {
348 let mut excluded = Vec::new();
349 if let Some(id_property) = id_property.as_ref() {
350 excluded.push(id_property.name.clone());
351 }
352 if let Some(version_property) = descriptor.version_property() {
353 excluded.push(version_property.name.clone());
354 }
355 sorted_update_fields(&node.values, excluded)
356 } else {
357 Vec::new()
358 };
359
360 let current_token = if let Some(c) = &node.comment {
363 Some(Arc::new(TraceScopeToken {
364 parent: parent_token.clone(),
365 track: teaql_core::TraceNode {
366 entity_type: node.entity.clone(),
367 entity_id: node.id().and_then(|v| match v {
368 Value::U64(n) => Some(*n),
369 Value::I64(n) => Some(*n as u64),
370 _ => None,
371 }),
372 comment: c.clone(),
373 },
374 node_index: plan.next_item_index,
375 }))
376 } else {
377 parent_token.clone()
378 };
379
380 plan.push(
381 node.entity.clone(),
382 if is_update {
383 GraphMutationKind::Update
384 } else {
385 GraphMutationKind::Create
386 },
387 node.values.clone(),
388 update_fields,
389 current_token.clone(),
390 node.original_values.clone(),
391 );
392
393 for (name, children) in &mut node.relations {
394 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
395 RepositoryError::Runtime(RuntimeError::MissingRelation {
396 entity: node.entity.clone(),
397 relation: name.clone(),
398 })
399 })?;
400 let child_repo = self.scoped_repository(relation.target_entity.clone());
401 for child in children {
402 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
403 child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
404 }
405 }
406 Ok(())
407 })
408 }
409
410 fn insert_graph_node_scoped<'b, 's: 'b>(
411 &'b self,
412 mut node: GraphNode,
413 parent_scope: Option<&'s ScopedCommentNode<'s>>,
414 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
415 Box::pin(async move {
416 match node.operation {
417 GraphOperation::Upsert | GraphOperation::Create => {}
418 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
419 GraphOperation::Remove => {
420 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
421 "create graph cannot remove node {}",
422 node.entity
423 ))));
424 }
425 }
426
427 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
429 parent: parent_scope,
430 track: teaql_core::TraceNode {
431 entity_type: node.entity.clone(),
432 entity_id: node
433 .id()
434 .and_then(|v| match v {
435 Value::U64(n) => Some(*n),
436 Value::I64(n) => Some(*n as u64),
437 _ => None,
438 }),
439 comment: c.clone(),
440 },
441 });
442 let active_scope = current_scope.as_ref().or(parent_scope);
443
444 let descriptor = self
445 .repository
446 .metadata
447 .context
448 .require_entity(&node.entity)
449 .map_err(RepositoryError::Runtime)?;
450
451 let mut one_relations = Vec::new();
452 let mut many_relations = Vec::new();
453 for (name, children) in std::mem::take(&mut node.relations) {
454 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
455 RepositoryError::Runtime(RuntimeError::MissingRelation {
456 entity: node.entity.clone(),
457 relation: name.clone(),
458 })
459 })?;
460 if relation.many {
461 many_relations.push((name, relation.clone(), children));
462 } else {
463 one_relations.push((name, relation.clone(), children));
464 }
465 }
466
467 for (name, relation, children) in one_relations {
468 if children.len() > 1 {
469 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
470 "relation {}.{} expects one child, got {}",
471 node.entity,
472 name,
473 children.len()
474 ))));
475 }
476 let mut saved_children = Vec::new();
477 for child in children {
478 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
479 let child_repo = self.scoped_repository(child.entity.clone());
480 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
481 if relation.attach {
482 let foreign_value = saved_child
483 .values
484 .get(&relation.foreign_key)
485 .cloned()
486 .ok_or_else(|| {
487 RepositoryError::Runtime(RuntimeError::Graph(format!(
488 "saved child {} missing foreign key {} for relation {}.{}",
489 relation.target_entity, relation.foreign_key, node.entity, name
490 )))
491 })?;
492 node.values
493 .insert(relation.local_key.clone(), foreign_value);
494 }
495 saved_children.push(saved_child);
496 }
497 node.relations.insert(name, saved_children);
498 }
499
500 let command = self
501 .prepare_insert_command(&InsertCommand {
502 entity: node.entity.clone(),
503 values: node.values.clone(),
504 trace_chain: Vec::new(),
505 })
506 .map_err(RepositoryError::Runtime)?;
507 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
508 self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
509 node.values = command.values;
510
511 for (name, relation, children) in many_relations {
512 let local_value = node
513 .values
514 .get(&relation.local_key)
515 .cloned()
516 .ok_or_else(|| {
517 RepositoryError::Runtime(RuntimeError::Graph(format!(
518 "parent {} missing local key {} for relation {}",
519 node.entity, relation.local_key, name
520 )))
521 })?;
522 let mut saved_children = Vec::new();
523 for mut child in children {
524 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
525 if relation.attach {
526 child
527 .values
528 .insert(relation.foreign_key.clone(), local_value.clone());
529 }
530 let child_repo = self.scoped_repository(child.entity.clone());
531 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
532 }
533 node.relations.insert(name, saved_children);
534 }
535
536 Ok(node)
537 })
538 }
539
540 fn upsert_graph_node_scoped<'b, 's: 'b>(
541 &'b self,
542 mut node: GraphNode,
543 parent_scope: Option<&'s ScopedCommentNode<'s>>,
544 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
545 Box::pin(async move {
546 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
548 parent: parent_scope,
549 track: teaql_core::TraceNode {
550 entity_type: node.entity.clone(),
551 entity_id: node
552 .id()
553 .and_then(|v| match v {
554 Value::U64(n) => Some(*n),
555 Value::I64(n) => Some(*n as u64),
556 _ => None,
557 }),
558 comment: c.clone(),
559 },
560 });
561 let active_scope = current_scope.as_ref().or(parent_scope);
562
563 match node.operation {
564 GraphOperation::Upsert | GraphOperation::Create => {}
565 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
566 GraphOperation::Remove => {
567 self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
568 self.delete_graph_node(&node, parent_scope).await?;
569 return Ok(node);
570 }
571 }
572
573 let descriptor = self
574 .repository
575 .metadata
576 .context
577 .require_entity(&node.entity)
578 .map_err(RepositoryError::Runtime)?;
579 let Some(id_property) = descriptor.id_property() else {
580 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
581 "entity {} has no id property for graph upsert",
582 node.entity
583 ))));
584 };
585 let Some(id) = node
586 .values
587 .get(&id_property.name)
588 .filter(|value| !is_unassigned_id_value(value))
589 .cloned()
590 else {
591 node.comment = None;
593 return self.insert_graph_node_scoped(node, active_scope).await;
594 };
595
596 if node.operation == GraphOperation::Create || self
597 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
598 .is_none()
599 {
600 node.comment = None;
601 return self.insert_graph_node_scoped(node, active_scope).await;
602 }
603
604 let mut one_relations = Vec::new();
605 let mut many_relations = Vec::new();
606 for (name, children) in std::mem::take(&mut node.relations) {
607 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
608 RepositoryError::Runtime(RuntimeError::MissingRelation {
609 entity: node.entity.clone(),
610 relation: name.clone(),
611 })
612 })?;
613 if relation.many {
614 many_relations.push((name, relation.clone(), children));
615 } else {
616 one_relations.push((name, relation.clone(), children));
617 }
618 }
619
620 for (name, relation, children) in one_relations {
621 if children.len() > 1 {
622 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
623 "relation {}.{} expects one child, got {}",
624 node.entity,
625 name,
626 children.len()
627 ))));
628 }
629 let mut saved_children = Vec::new();
630 for child in children {
631 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
632 let child_repo = self.scoped_repository(child.entity.clone());
633 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
634 if relation.attach {
635 let foreign_value = saved_child
636 .values
637 .get(&relation.foreign_key)
638 .cloned()
639 .ok_or_else(|| {
640 RepositoryError::Runtime(RuntimeError::Graph(format!(
641 "saved child {} missing foreign key {} for relation {}.{}",
642 relation.target_entity, relation.foreign_key, node.entity, name
643 )))
644 })?;
645 node.values
646 .insert(relation.local_key.clone(), foreign_value);
647 }
648 saved_children.push(saved_child);
649 }
650 node.relations.insert(name, saved_children);
651 }
652
653 let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
654 if !update.values.is_empty() || update.expected_version.is_some() {
655 let prepared_update = self
656 .prepare_update_command(&update)
657 .map_err(RepositoryError::Runtime)?;
658 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
659 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
660 for (field, value) in &prepared_update.values {
661 node.values.insert(field.clone(), value.clone());
662 }
663 if let Some(version_property) = descriptor.version_property() {
664 if let Some(expected_version) = prepared_update.expected_version {
665 node.values.insert(
666 version_property.name.clone(),
667 Value::I64(expected_version + 1),
668 );
669 }
670 }
671 }
672
673 for (name, relation, children) in many_relations {
674 let local_value = node
675 .values
676 .get(&relation.local_key)
677 .cloned()
678 .ok_or_else(|| {
679 RepositoryError::Runtime(RuntimeError::Graph(format!(
680 "parent {} missing local key {} for relation {}",
681 node.entity, relation.local_key, name
682 )))
683 })?;
684 let child_repo = self.scoped_repository(relation.target_entity.clone());
685 let child_descriptor = self
686 .repository
687 .metadata
688 .context
689 .require_entity(&relation.target_entity)
690 .map_err(RepositoryError::Runtime)?;
691 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
692 RepositoryError::Runtime(RuntimeError::Graph(format!(
693 "entity {} has no id property",
694 relation.target_entity
695 )))
696 })?;
697
698 let mut seen = std::collections::BTreeSet::new();
699 let mut saved_children = Vec::new();
700 for mut child in children {
701 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
702 if relation.attach && child.operation != GraphOperation::Reference {
703 child
704 .values
705 .insert(relation.foreign_key.clone(), local_value.clone());
706 }
707 if let Some(child_id) = child
708 .values
709 .get(&child_id_property.name)
710 .filter(|value| !is_unassigned_id_value(value))
711 {
712 let key = graph_identity_key(child_id);
713 if !seen.insert(key.clone()) {
714 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
715 "duplicate child id {key} in relation {}.{}",
716 node.entity, name
717 ))));
718 }
719 }
720 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
721 }
722
723
724
725 node.relations.insert(name, saved_children);
726 }
727
728 Ok(node)
729 })
730 }
731
732 async fn validate_reference_node(
733 &self,
734 node: GraphNode,
735 trace_chain: Vec<teaql_core::TraceNode>,
736 ) -> Result<GraphNode, RepositoryError<E::Error>> {
737 if !node.relations.is_empty() {
738 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
739 "reference node {} cannot contain child relations",
740 node.entity
741 ))));
742 }
743 let descriptor = self
744 .repository
745 .metadata
746 .context
747 .require_entity(&node.entity)
748 .map_err(RepositoryError::Runtime)?;
749 let id_property = descriptor.id_property().ok_or_else(|| {
750 RepositoryError::Runtime(RuntimeError::Graph(format!(
751 "entity {} has no id property for graph reference",
752 node.entity
753 )))
754 })?;
755 let id = node
756 .values
757 .get(&id_property.name)
758 .filter(|value| !is_unassigned_id_value(value))
759 .cloned()
760 .ok_or_else(|| {
761 RepositoryError::Runtime(RuntimeError::Graph(format!(
762 "reference node {} missing id property {}",
763 node.entity, id_property.name
764 )))
765 })?;
766
767 for field in node.values.keys() {
768 if field == &id_property.name {
769 continue;
770 }
771 if descriptor
772 .version_property()
773 .map(|property| field == &property.name)
774 .unwrap_or(false)
775 {
776 continue;
777 }
778 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
779 "reference node {} cannot carry mutable field {}",
780 node.entity, field
781 ))));
782 }
783
784 let current = self
785 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
786 .ok_or_else(|| {
787 RepositoryError::Runtime(RuntimeError::Graph(format!(
788 "reference node {}({}) does not exist",
789 node.entity,
790 graph_identity_key(&id)
791 )))
792 })?;
793
794 if let Some(version_property) = descriptor.version_property() {
795 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
796 if *existing_version < 0 {
797 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
798 "reference node {}({}) is deleted",
799 node.entity,
800 graph_identity_key(&id)
801 ))));
802 }
803 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
804 {
805 if expected_version != existing_version {
806 return Err(RepositoryError::Runtime(
807 RuntimeError::OptimisticLockConflict {
808 entity: node.entity,
809 id: graph_identity_key(&id),
810 },
811 ));
812 }
813 }
814 }
815 }
816
817 Ok(GraphNode {
818 entity: node.entity,
819 values: current,
820 relations: BTreeMap::new(),
821 operation: GraphOperation::Reference,
822 comment: None,
823 dirty_fields: None, original_values: None,
824 })
825 }
826
827 async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
828 if !node.relations.is_empty() {
829 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
830 "remove node {} cannot contain child relations",
831 node.entity
832 ))));
833 }
834 let descriptor = self
835 .repository
836 .metadata
837 .context
838 .require_entity(&node.entity)
839 .map_err(RepositoryError::Runtime)?;
840 let id_property = descriptor.id_property().ok_or_else(|| {
841 RepositoryError::Runtime(RuntimeError::Graph(format!(
842 "entity {} has no id property for graph remove",
843 node.entity
844 )))
845 })?;
846 let id = node
847 .values
848 .get(&id_property.name)
849 .filter(|value| !is_unassigned_id_value(value))
850 .cloned()
851 .ok_or_else(|| {
852 RepositoryError::Runtime(RuntimeError::Graph(format!(
853 "remove node {} missing id property {}",
854 node.entity, id_property.name
855 )))
856 })?;
857 let current = self
858 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
859 .ok_or_else(|| {
860 RepositoryError::Runtime(RuntimeError::Graph(format!(
861 "remove node {}({}) does not exist",
862 node.entity,
863 graph_identity_key(&id)
864 )))
865 })?;
866 if let Some(version_property) = descriptor.version_property() {
867 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
868 if *existing_version < 0 {
869 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
870 "remove node {}({}) is already deleted",
871 node.entity,
872 graph_identity_key(&id)
873 ))));
874 }
875 }
876 }
877 Ok(())
878 }
879
880 fn graph_node_from_record(
881 &self,
882 entity: &str,
883 record: Record,
884 ) -> Result<GraphNode, RuntimeError> {
885 let descriptor = self.repository.metadata.context.require_entity(entity)?;
886 let mut node = GraphNode::new(entity);
887
888 for (field, value) in record {
889 if field == "_comment" {
890 if let Value::Text(comment) = value {
891 node.set_comment(comment);
892 }
893 continue;
894 }
895 let Some(relation) = descriptor.relation_by_name(&field) else {
896 node.values.insert(field, value);
897 continue;
898 };
899
900 match value {
901 Value::Null => {
902 node.relations.entry(field).or_default();
903 }
904 Value::Object(record) => {
905 let child = self.graph_node_from_record(&relation.target_entity, record)?;
906 node.relations.entry(field).or_default().push(child);
907 }
908 Value::List(values) => {
909 let children = node.relations.entry(field.clone()).or_default();
910 for value in values {
911 let Value::Object(record) = value else {
912 return Err(RuntimeError::Graph(format!(
913 "relation {}.{} expects object children, got {:?}",
914 entity, field, value
915 )));
916 };
917 children
918 .push(self.graph_node_from_record(&relation.target_entity, record)?);
919 }
920 }
921 other => {
922 return Err(RuntimeError::Graph(format!(
923 "relation {}.{} expects object/list/null, got {:?}",
924 entity, field, other
925 )));
926 }
927 }
928 }
929
930 Ok(node)
931 }
932
933 fn graph_update_command(
934 &self,
935 node: &mut GraphNode,
936 descriptor: &EntityDescriptor,
937 id_property: &PropertyDescriptor,
938 id: &Value,
939 ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
940 crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
941 let check_result = self
942 .repository
943 .metadata
944 .context
945 .check_and_fix_record(&node.entity, &mut node.values);
946 crate::clear_record_status(&mut node.values);
947 check_result.map_err(RepositoryError::Runtime)?;
948
949 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
950 command.old_values = node.original_values.clone();
951 if let Some(version_property) = descriptor.version_property() {
952 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
953 command = command.expected_version(*version);
954 }
955 }
956 for property in descriptor.properties.iter().filter(|property| {
960 !property.is_id
961 && !property.is_version
962 && property.name != id_property.name
963 && match &node.dirty_fields {
964 Some(dirty) => dirty.contains(&property.name),
965 None => node.values.contains_key(&property.name),
966 }
967 }) {
968 if let Some(value) = node.values.get(&property.name) {
969 command.values.insert(property.name.clone(), value.clone());
970 }
971 }
972 Ok(command)
973 }
974
975 fn delete_graph_node<'b, 's: 'b>(
976 &'b self,
977 node: &'b GraphNode,
978 parent_scope: Option<&'s ScopedCommentNode<'s>>,
979 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
980 Box::pin(async move {
981 let descriptor = self
982 .repository
983 .metadata
984 .context
985 .require_entity(&node.entity)
986 .map_err(RepositoryError::Runtime)?;
987 let id_property = descriptor.id_property().ok_or_else(|| {
988 RepositoryError::Runtime(RuntimeError::Graph(format!(
989 "entity {} has no id property for graph remove",
990 node.entity
991 )))
992 })?;
993 let id = node
994 .values
995 .get(&id_property.name)
996 .filter(|value| !is_unassigned_id_value(value))
997 .cloned()
998 .ok_or_else(|| {
999 RepositoryError::Runtime(RuntimeError::Graph(format!(
1000 "remove node {} missing id property {}",
1001 node.entity, id_property.name
1002 )))
1003 })?;
1004 let mut delete = DeleteCommand::new(node.entity.clone(), id);
1005 if let Some(version_property) = descriptor.version_property() {
1006 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1007 delete = delete.expected_version(*version);
1008 }
1009 }
1010
1011 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1013 parent: parent_scope,
1014 track: teaql_core::TraceNode {
1015 entity_type: node.entity.clone(),
1016 entity_id: node
1017 .id()
1018 .and_then(|v| match v {
1019 Value::U64(n) => Some(*n),
1020 Value::I64(n) => Some(*n as u64),
1021 _ => None,
1022 }),
1023 comment: c.clone(),
1024 },
1025 });
1026 let active_scope = current_scope.as_ref().or(parent_scope);
1027 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1028
1029 self.delete_scoped(&delete, lineage).await
1030 })
1031 }
1032
1033 pub async fn fetch_graph_current_row(
1034 &self,
1035 entity: &str,
1036 id_property: &str,
1037 id: &Value,
1038 trace_chain: Vec<teaql_core::TraceNode>,
1039 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1040 let mut query = SelectQuery::new(entity).filter(Expr::eq(id_property, id.clone()));
1041 query.trace_chain = trace_chain;
1042 let mut rows = self
1043 .scoped_repository(entity.to_owned())
1044 .fetch_all(&query).await?;
1045 Ok(rows.pop())
1046 }
1047
1048 async fn fetch_graph_children(
1049 &self,
1050 entity: &str,
1051 foreign_key: &str,
1052 parent_value: &Value,
1053 trace_chain: Vec<teaql_core::TraceNode>,
1054 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1055 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1056 query.trace_chain = trace_chain;
1057 self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1058 }
1059}