1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use teaql_core::{
5 DeleteCommand, Entity, EntityDescriptor, Expr, InsertCommand, PropertyDescriptor, Record,
6 SelectQuery, UpdateCommand, Value,
7};
8
9use crate::{
10 GraphMutationKind, GraphMutationPlan, GraphNode, GraphOperation, RepositoryError,
11 RuntimeError, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
12};
13use crate::entity_status::EntityStatus;
14
15use super::{ResolvedRepository, helpers::*};
16
17impl<'a, E> ResolvedRepository<'a, E>
18where
19 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
20{
21 pub async fn save_graph(&self, node: GraphNode) -> Result<GraphNode, RepositoryError<E::Error>> {
22 if node.entity != self.entity {
23 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
24 "resolved repository {} cannot save graph root {}",
25 self.entity, node.entity
26 ))));
27 }
28 let plan = self.plan_graph(node).await?;
29 self.execute_graph_plan(plan).await
30 }
31
32 pub async fn save_entity_graph_from(&self, graph: teaql_core::EntityGraph) -> Result<GraphNode, RepositoryError<E::Error>> {
33 fn convert(node: teaql_core::EntityGraphNode) -> GraphNode {
34 let mut relations = BTreeMap::new();
35 for (rel_name, child) in node.children {
36 relations.entry(rel_name).or_insert_with(Vec::new).push(convert(child));
37 }
38 GraphNode {
39 entity: node.entity_type,
40 values: node.record,
41 relations,
42 operation: match node.operation {
43 teaql_core::EntityGraphOperation::Save => crate::GraphOperation::Upsert,
44 teaql_core::EntityGraphOperation::Delete => crate::GraphOperation::Remove,
45 },
46 comment: node.comment,
47 dirty_fields: None, original_values: None,
48 }
49 }
50 self.save_graph(convert(graph.root)).await
51 }
52
53 pub async fn save_entity_graph<T>(&self, entity: T) -> Result<GraphNode, RepositoryError<E::Error>>
54 where
55 T: Entity,
56 {
57 let node = self
58 .graph_node_from_entity(entity)
59 .map_err(RepositoryError::Runtime)?;
60 self.save_graph(node).await
61 }
62
63 pub async fn save_entity<T>(&self, entity: T, status: EntityStatus) -> Result<GraphNode, RepositoryError<E::Error>>
64 where
65 T: Entity,
66 {
67 if !status.need_persist() {
68 return Ok(GraphNode::new(&self.entity));
69 }
70 if status.is_deleted() {
71 let mut node = self.graph_node_from_entity(entity)
72 .map_err(RepositoryError::Runtime)?;
73 node.operation = GraphOperation::Remove;
74 node.relations.clear();
75 self.save_graph(node).await
76 } else {
77 self.save_entity_graph(entity).await
78 }
79 }
80 pub async fn save_entity_with_comment<T>(&self, entity: T, status: EntityStatus, comment: impl Into<String>) -> Result<GraphNode, RepositoryError<E::Error>>
81 where
82 T: Entity,
83 {
84 if status.is_deleted() {
85 let mut node = self.graph_node_from_entity(entity)
86 .map_err(RepositoryError::Runtime)?;
87 node.operation = GraphOperation::Remove;
88 node.relations.clear();
89 node.set_comment(comment);
90 self.save_graph(node).await
91 } else {
92 self.save_entity_graph_with_comment(entity, comment).await
93 }
94 }
95 pub async fn save_entity_graph_with_comment<T>(
96 &self,
97 entity: T,
98 comment: impl Into<String>,
99 ) -> Result<GraphNode, RepositoryError<E::Error>>
100 where
101 T: Entity,
102 {
103 let mut node = self
104 .graph_node_from_entity(entity)
105 .map_err(RepositoryError::Runtime)?;
106 node.set_comment(comment);
107 self.save_graph(node).await
108 }
109
110 pub async fn create_entity_graph_with_comment<T>(
114 &self,
115 entity: T,
116 comment: impl Into<String>,
117 ) -> Result<GraphNode, RepositoryError<E::Error>>
118 where
119 T: Entity,
120 {
121 let mut node = self
122 .graph_node_from_entity(entity)
123 .map_err(RepositoryError::Runtime)?;
124 node.operation = GraphOperation::Create;
125 node.set_comment(comment);
126 self.save_graph(node).await
127 }
128
129 pub async fn plan_graph(
130 &self,
131 node: GraphNode,
132 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>> {
133 if node.entity != self.entity {
134 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
135 "resolved repository {} cannot plan graph root {}",
136 self.entity, node.entity
137 ))));
138 }
139 let mut node = node;
140 let mut plan = GraphMutationPlan::default();
141 self.collect_graph_plan(&mut node, &mut plan, None, None, false).await?;
142 plan.planned_root = Some(node);
143 plan.rebuild_batches();
144 Ok(plan)
145 }
146
147 pub async fn execute_graph_plan(
148 &self,
149 plan: GraphMutationPlan,
150 ) -> Result<GraphNode, RepositoryError<E::Error>> {
151 let Some(root) = plan.planned_root else {
152 return Err(RepositoryError::Runtime(RuntimeError::Graph(
153 "graph mutation plan has no planned root".to_owned(),
154 )));
155 };
156
157 for batch in plan.batches {
158 if batch.items.is_empty() || (matches!(batch.kind, GraphMutationKind::Update) && batch.update_fields.is_empty()) {
159 continue;
160 }
161 match batch.kind {
162 GraphMutationKind::Create => {
163 let mut cmd = teaql_core::BatchInsertCommand::new(&batch.entity);
164 for item in batch.items {
165 cmd.batch_values.push(item.values);
166 if let Some(token) = item.scope_token {
167 cmd.trace_chains.push(token.recover_trace_chain());
168 } else {
169 cmd.trace_chains.push(Vec::new());
170 }
171 }
172 self.execute_prepared_batch_insert(cmd).await?;
173 }
174 GraphMutationKind::Update => {
175 if batch.update_fields.is_empty() {
176 continue;
177 }
178 let mut cmd = teaql_core::BatchUpdateCommand::new(&batch.entity, batch.update_fields);
179 for item in batch.items {
180 let id = item.values.get("id").cloned().ok_or_else(|| {
181 RepositoryError::Runtime(RuntimeError::Graph(format!(
182 "update item in batch missing id for {}", batch.entity
183 )))
184 })?;
185 let version = item.values.get("version").and_then(|v| {
186 if let teaql_core::Value::I64(n) = v { Some(*n) } else { None }
187 });
188 cmd.batch_values.push(item.values);
189 cmd.batch_ids.push(id);
190 cmd.batch_expected_versions.push(version);
191 cmd.batch_old_values.push(item.old_values);
192 if let Some(token) = item.scope_token {
193 cmd.trace_chains.push(token.recover_trace_chain());
194 } else {
195 cmd.trace_chains.push(Vec::new());
196 }
197 }
198 self.execute_prepared_batch_update(cmd).await?;
199 }
200 GraphMutationKind::Delete => {
201 for item in batch.items {
203 let id = item.values.get("id").cloned().ok_or_else(|| {
204 RepositoryError::Runtime(RuntimeError::Graph(format!(
205 "delete item in batch missing id for {}", batch.entity
206 )))
207 })?;
208 let mut cmd = teaql_core::DeleteCommand::new(&batch.entity, id);
209 if let Some(teaql_core::Value::I64(version)) = item.values.get("version") {
210 cmd = cmd.expected_version(*version);
211 }
212 let trace_chain = if let Some(token) = item.scope_token { token.recover_trace_chain() } else { Vec::new() };
213 self.delete_scoped(&cmd, trace_chain).await?;
214 }
215 }
216 GraphMutationKind::Reference => {
217 }
219 }
220 }
221
222 Ok(root)
223 }
224
225 pub fn graph_node_from_entity<T>(&self, entity: T) -> Result<GraphNode, RuntimeError>
226 where
227 T: Entity,
228 {
229 let descriptor = T::entity_descriptor();
230 if descriptor.name != self.entity {
231 return Err(RuntimeError::Graph(format!(
232 "resolved repository {} cannot extract graph root {}",
233 self.entity, descriptor.name
234 )));
235 }
236 let dirty_fields = entity.dirty_fields();
239 let original_values = entity.original_values();
240 let is_deleted = entity.is_marked_as_delete();
241 let comment = entity.get_comment();
242 let mut node = self.graph_node_from_record(&descriptor.name, entity.into_record())?;
243 node.dirty_fields = dirty_fields;
244 node.original_values = original_values;
245 if is_deleted {
246 node.operation = GraphOperation::Remove;
247 node.relations.clear();
248 }
249 if let Some(c) = comment {
250 node.set_comment(c);
251 }
252 Ok(node)
253 }
254
255 fn collect_graph_plan<'b, 's: 'b>(
256 &'b self,
257 node: &'b mut GraphNode,
258 plan: &'b mut GraphMutationPlan,
259 parent_scope: Option<&'s ScopedCommentNode<'s>>,
260 parent_token: Option<Arc<TraceScopeToken>>,
261 parent_is_create: bool,
262 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), RepositoryError<E::Error>>> + Send + '_>> {
263 Box::pin(async move {
264 match node.operation {
265 GraphOperation::Reference => {
266 plan.push(
267 node.entity.clone(),
268 GraphMutationKind::Reference,
269 node.values.clone(),
270 Vec::new(),
271 parent_token,
272 node.original_values.clone(),
273 );
274 return Ok(());
275 }
276 GraphOperation::Remove => {
277 plan.push(
278 node.entity.clone(),
279 GraphMutationKind::Delete,
280 node.values.clone(),
281 Vec::new(),
282 parent_token,
283 node.original_values.clone(),
284 );
285 return Ok(());
286 }
287 GraphOperation::Upsert | GraphOperation::Create => {}
288 }
289
290 let descriptor = self
291 .repository
292 .metadata
293 .context
294 .require_entity(&node.entity)
295 .map_err(RepositoryError::Runtime)?;
296
297 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
299 parent: parent_scope,
300 track: teaql_core::TraceNode {
301 entity_type: node.entity.clone(),
302 entity_id: node.id().and_then(|v| match v {
303 Value::U64(n) => Some(*n),
304 Value::I64(n) => Some(*n as u64),
305 _ => None,
306 }),
307 comment: c.clone(),
308 },
309 });
310 let active_scope = current_scope.as_ref().or(parent_scope);
311
312 let id_property = descriptor.id_property().cloned();
313 let id = id_property.as_ref().and_then(|property| {
314 node.values
315 .get(&property.name)
316 .filter(|value| !is_unassigned_id_value(value))
317 .cloned()
318 });
319
320 if let Some(id_val) = &id {
321 if !plan.visited_nodes.insert((node.entity.clone(), graph_identity_key(id_val))) {
322 return Ok(());
323 }
324 }
325
326 let is_create_op = node.operation == GraphOperation::Create || (parent_is_create && node.operation == GraphOperation::Upsert);
327
328 let is_update = if is_create_op {
329 false
330 } else {
331 match (id_property.as_ref(), id.as_ref()) {
332 (Some(id_property), Some(id)) => self
333 .fetch_graph_current_row(&node.entity, &id_property.name, id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
334 .is_some(),
335 _ => false,
336 }
337 };
338 if !is_update {
339 if let Some(id_property) = id_property.as_ref() {
340 let needs_id = !node.values.contains_key(&id_property.name)
341 || node
342 .values
343 .get(&id_property.name)
344 .is_some_and(is_unassigned_id_value);
345 if needs_id {
346 let id = self
347 .repository
348 .metadata
349 .context
350 .next_id(&node.entity)
351 .map_err(RepositoryError::Runtime)?;
352 node.values.insert(id_property.name.clone(), Value::U64(id));
353 }
354 }
355 ensure_initial_version(&mut node.values, descriptor);
356 }
357 let update_fields = if is_update {
358 let mut excluded = Vec::new();
359 if let Some(id_property) = id_property.as_ref() {
360 excluded.push(id_property.name.clone());
361 }
362 if let Some(version_property) = descriptor.version_property() {
363 excluded.push(version_property.name.clone());
364 }
365 let mut fields = sorted_update_fields(&node.values, excluded);
366 if let Some(dirty) = &node.dirty_fields {
367 fields.retain(|f| dirty.contains(f));
368 }
369 fields
370 } else {
371 Vec::new()
372 };
373
374 let current_token = if let Some(c) = &node.comment {
377 Some(Arc::new(TraceScopeToken {
378 parent: parent_token.clone(),
379 track: teaql_core::TraceNode {
380 entity_type: node.entity.clone(),
381 entity_id: node.id().and_then(|v| match v {
382 Value::U64(n) => Some(*n),
383 Value::I64(n) => Some(*n as u64),
384 _ => None,
385 }),
386 comment: c.clone(),
387 },
388 node_index: plan.next_item_index,
389 }))
390 } else {
391 parent_token.clone()
392 };
393
394 plan.push(
395 node.entity.clone(),
396 if is_update {
397 GraphMutationKind::Update
398 } else {
399 GraphMutationKind::Create
400 },
401 node.values.clone(),
402 update_fields,
403 current_token.clone(),
404 node.original_values.clone(),
405 );
406
407 for (name, children) in &mut node.relations {
408 let relation = descriptor.relation_by_name(name).ok_or_else(|| {
409 RepositoryError::Runtime(RuntimeError::MissingRelation {
410 entity: node.entity.clone(),
411 relation: name.clone(),
412 })
413 })?;
414 let child_repo = self.scoped_repository(relation.target_entity.clone());
415 for child in children {
416 ensure_relation_target(&node.entity, name, &relation.target_entity, child)?;
417 child_repo.collect_graph_plan(child, plan, active_scope, current_token.clone(), is_create_op).await?;
418 }
419 }
420 Ok(())
421 })
422 }
423
424 fn insert_graph_node_scoped<'b, 's: 'b>(
425 &'b self,
426 mut node: GraphNode,
427 parent_scope: Option<&'s ScopedCommentNode<'s>>,
428 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
429 Box::pin(async move {
430 match node.operation {
431 GraphOperation::Upsert | GraphOperation::Create => {}
432 GraphOperation::Reference => return self.validate_reference_node(node, parent_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
433 GraphOperation::Remove => {
434 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
435 "create graph cannot remove node {}",
436 node.entity
437 ))));
438 }
439 }
440
441 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
443 parent: parent_scope,
444 track: teaql_core::TraceNode {
445 entity_type: node.entity.clone(),
446 entity_id: node
447 .id()
448 .and_then(|v| match v {
449 Value::U64(n) => Some(*n),
450 Value::I64(n) => Some(*n as u64),
451 _ => None,
452 }),
453 comment: c.clone(),
454 },
455 });
456 let active_scope = current_scope.as_ref().or(parent_scope);
457
458 let descriptor = self
459 .repository
460 .metadata
461 .context
462 .require_entity(&node.entity)
463 .map_err(RepositoryError::Runtime)?;
464
465 let mut one_relations = Vec::new();
466 let mut many_relations = Vec::new();
467 for (name, children) in std::mem::take(&mut node.relations) {
468 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
469 RepositoryError::Runtime(RuntimeError::MissingRelation {
470 entity: node.entity.clone(),
471 relation: name.clone(),
472 })
473 })?;
474 if relation.many {
475 many_relations.push((name, relation.clone(), children));
476 } else {
477 one_relations.push((name, relation.clone(), children));
478 }
479 }
480
481 for (name, relation, children) in one_relations {
482 if children.len() > 1 {
483 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
484 "relation {}.{} expects one child, got {}",
485 node.entity,
486 name,
487 children.len()
488 ))));
489 }
490 let mut saved_children = Vec::new();
491 for child in children {
492 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
493 let child_repo = self.scoped_repository(child.entity.clone());
494 let saved_child = child_repo.insert_graph_node_scoped(child, active_scope).await?;
495 if relation.attach {
496 let foreign_value = saved_child
497 .values
498 .get(&relation.foreign_key)
499 .cloned()
500 .ok_or_else(|| {
501 RepositoryError::Runtime(RuntimeError::Graph(format!(
502 "saved child {} missing foreign key {} for relation {}.{}",
503 relation.target_entity, relation.foreign_key, node.entity, name
504 )))
505 })?;
506 node.values
507 .insert(relation.local_key.clone(), foreign_value);
508 }
509 saved_children.push(saved_child);
510 }
511 node.relations.insert(name, saved_children);
512 }
513
514 let command = self
515 .prepare_insert_command(&InsertCommand {
516 entity: node.entity.clone(),
517 values: node.values.clone(),
518 trace_chain: Vec::new(),
519 })
520 .map_err(RepositoryError::Runtime)?;
521 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
522 self.execute_prepared_insert_with_comment(command.clone(), lineage).await?;
523 node.values = command.values;
524
525 for (name, relation, children) in many_relations {
526 let local_value = node
527 .values
528 .get(&relation.local_key)
529 .cloned()
530 .ok_or_else(|| {
531 RepositoryError::Runtime(RuntimeError::Graph(format!(
532 "parent {} missing local key {} for relation {}",
533 node.entity, relation.local_key, name
534 )))
535 })?;
536 let mut saved_children = Vec::new();
537 for mut child in children {
538 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
539 if relation.attach {
540 child
541 .values
542 .insert(relation.foreign_key.clone(), local_value.clone());
543 }
544 let child_repo = self.scoped_repository(child.entity.clone());
545 saved_children.push(child_repo.insert_graph_node_scoped(child, active_scope).await?);
546 }
547 node.relations.insert(name, saved_children);
548 }
549
550 Ok(node)
551 })
552 }
553
554 fn upsert_graph_node_scoped<'b, 's: 'b>(
555 &'b self,
556 mut node: GraphNode,
557 parent_scope: Option<&'s ScopedCommentNode<'s>>,
558 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<GraphNode, RepositoryError<E::Error>>> + Send + '_>> {
559 Box::pin(async move {
560 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
562 parent: parent_scope,
563 track: teaql_core::TraceNode {
564 entity_type: node.entity.clone(),
565 entity_id: node
566 .id()
567 .and_then(|v| match v {
568 Value::U64(n) => Some(*n),
569 Value::I64(n) => Some(*n as u64),
570 _ => None,
571 }),
572 comment: c.clone(),
573 },
574 });
575 let active_scope = current_scope.as_ref().or(parent_scope);
576
577 match node.operation {
578 GraphOperation::Upsert | GraphOperation::Create => {}
579 GraphOperation::Reference => return self.validate_reference_node(node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await,
580 GraphOperation::Remove => {
581 self.validate_remove_node(&node, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?;
582 self.delete_graph_node(&node, parent_scope).await?;
583 return Ok(node);
584 }
585 }
586
587 let descriptor = self
588 .repository
589 .metadata
590 .context
591 .require_entity(&node.entity)
592 .map_err(RepositoryError::Runtime)?;
593 let Some(id_property) = descriptor.id_property() else {
594 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
595 "entity {} has no id property for graph upsert",
596 node.entity
597 ))));
598 };
599 let Some(id) = node
600 .values
601 .get(&id_property.name)
602 .filter(|value| !is_unassigned_id_value(value))
603 .cloned()
604 else {
605 node.comment = None;
607 return self.insert_graph_node_scoped(node, active_scope).await;
608 };
609
610 if node.operation == GraphOperation::Create || self
611 .fetch_graph_current_row(&node.entity, &id_property.name, &id, active_scope.map(|s| s.to_trace_chain()).unwrap_or_default()).await?
612 .is_none()
613 {
614 node.comment = None;
615 return self.insert_graph_node_scoped(node, active_scope).await;
616 }
617
618 let mut one_relations = Vec::new();
619 let mut many_relations = Vec::new();
620 for (name, children) in std::mem::take(&mut node.relations) {
621 let relation = descriptor.relation_by_name(&name).ok_or_else(|| {
622 RepositoryError::Runtime(RuntimeError::MissingRelation {
623 entity: node.entity.clone(),
624 relation: name.clone(),
625 })
626 })?;
627 if relation.many {
628 many_relations.push((name, relation.clone(), children));
629 } else {
630 one_relations.push((name, relation.clone(), children));
631 }
632 }
633
634 for (name, relation, children) in one_relations {
635 if children.len() > 1 {
636 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
637 "relation {}.{} expects one child, got {}",
638 node.entity,
639 name,
640 children.len()
641 ))));
642 }
643 let mut saved_children = Vec::new();
644 for child in children {
645 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
646 let child_repo = self.scoped_repository(child.entity.clone());
647 let saved_child = child_repo.upsert_graph_node_scoped(child, active_scope).await?;
648 if relation.attach {
649 let foreign_value = saved_child
650 .values
651 .get(&relation.foreign_key)
652 .cloned()
653 .ok_or_else(|| {
654 RepositoryError::Runtime(RuntimeError::Graph(format!(
655 "saved child {} missing foreign key {} for relation {}.{}",
656 relation.target_entity, relation.foreign_key, node.entity, name
657 )))
658 })?;
659 node.values
660 .insert(relation.local_key.clone(), foreign_value);
661 }
662 saved_children.push(saved_child);
663 }
664 node.relations.insert(name, saved_children);
665 }
666
667 let update = self.graph_update_command(&mut node, descriptor, id_property, &id)?;
668 if !update.values.is_empty() {
669 let prepared_update = self
670 .prepare_update_command(&update)
671 .map_err(RepositoryError::Runtime)?;
672 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
673 self.execute_prepared_update_with_comment(prepared_update.clone(), lineage).await?;
674 for (field, value) in &prepared_update.values {
675 node.values.insert(field.clone(), value.clone());
676 }
677 if let Some(version_property) = descriptor.version_property() {
678 if let Some(expected_version) = prepared_update.expected_version {
679 node.values.insert(
680 version_property.name.clone(),
681 Value::I64(expected_version + 1),
682 );
683 }
684 }
685 }
686
687 for (name, relation, children) in many_relations {
688 let local_value = node
689 .values
690 .get(&relation.local_key)
691 .cloned()
692 .ok_or_else(|| {
693 RepositoryError::Runtime(RuntimeError::Graph(format!(
694 "parent {} missing local key {} for relation {}",
695 node.entity, relation.local_key, name
696 )))
697 })?;
698 let child_repo = self.scoped_repository(relation.target_entity.clone());
699 let child_descriptor = self
700 .repository
701 .metadata
702 .context
703 .require_entity(&relation.target_entity)
704 .map_err(RepositoryError::Runtime)?;
705 let child_id_property = child_descriptor.id_property().ok_or_else(|| {
706 RepositoryError::Runtime(RuntimeError::Graph(format!(
707 "entity {} has no id property",
708 relation.target_entity
709 )))
710 })?;
711
712 let mut seen = std::collections::BTreeSet::new();
713 let mut saved_children = Vec::new();
714 for mut child in children {
715 ensure_relation_target(&node.entity, &name, &relation.target_entity, &child)?;
716 if relation.attach && child.operation != GraphOperation::Reference {
717 child
718 .values
719 .insert(relation.foreign_key.clone(), local_value.clone());
720 }
721 if let Some(child_id) = child
722 .values
723 .get(&child_id_property.name)
724 .filter(|value| !is_unassigned_id_value(value))
725 {
726 let key = graph_identity_key(child_id);
727 if !seen.insert(key.clone()) {
728 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
729 "duplicate child id {key} in relation {}.{}",
730 node.entity, name
731 ))));
732 }
733 }
734 saved_children.push(child_repo.upsert_graph_node_scoped(child, active_scope).await?);
735 }
736
737
738
739 node.relations.insert(name, saved_children);
740 }
741
742 Ok(node)
743 })
744 }
745
746 async fn validate_reference_node(
747 &self,
748 node: GraphNode,
749 trace_chain: Vec<teaql_core::TraceNode>,
750 ) -> Result<GraphNode, RepositoryError<E::Error>> {
751 if !node.relations.is_empty() {
752 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
753 "reference node {} cannot contain child relations",
754 node.entity
755 ))));
756 }
757 let descriptor = self
758 .repository
759 .metadata
760 .context
761 .require_entity(&node.entity)
762 .map_err(RepositoryError::Runtime)?;
763 let id_property = descriptor.id_property().ok_or_else(|| {
764 RepositoryError::Runtime(RuntimeError::Graph(format!(
765 "entity {} has no id property for graph reference",
766 node.entity
767 )))
768 })?;
769 let id = node
770 .values
771 .get(&id_property.name)
772 .filter(|value| !is_unassigned_id_value(value))
773 .cloned()
774 .ok_or_else(|| {
775 RepositoryError::Runtime(RuntimeError::Graph(format!(
776 "reference node {} missing id property {}",
777 node.entity, id_property.name
778 )))
779 })?;
780
781 for field in node.values.keys() {
782 if field == &id_property.name {
783 continue;
784 }
785 if descriptor
786 .version_property()
787 .map(|property| field == &property.name)
788 .unwrap_or(false)
789 {
790 continue;
791 }
792 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
793 "reference node {} cannot carry mutable field {}",
794 node.entity, field
795 ))));
796 }
797
798 let current = self
799 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
800 .ok_or_else(|| {
801 RepositoryError::Runtime(RuntimeError::Graph(format!(
802 "reference node {}({}) does not exist",
803 node.entity,
804 graph_identity_key(&id)
805 )))
806 })?;
807
808 if let Some(version_property) = descriptor.version_property() {
809 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
810 if *existing_version < 0 {
811 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
812 "reference node {}({}) is deleted",
813 node.entity,
814 graph_identity_key(&id)
815 ))));
816 }
817 if let Some(Value::I64(expected_version)) = node.values.get(&version_property.name)
818 {
819 if expected_version != existing_version {
820 println!("OptimisticLockConflict in validate_reference_node! entity={}, expected={}, existing={}", node.entity, expected_version, existing_version);
821 return Err(RepositoryError::Runtime(
822 RuntimeError::OptimisticLockConflict {
823 entity: node.entity,
824 id: graph_identity_key(&id),
825 },
826 ));
827 }
828 }
829 }
830 }
831
832 Ok(GraphNode {
833 entity: node.entity,
834 values: current,
835 relations: BTreeMap::new(),
836 operation: GraphOperation::Reference,
837 comment: None,
838 dirty_fields: None, original_values: None,
839 })
840 }
841
842 async fn validate_remove_node(&self, node: &GraphNode, trace_chain: Vec<teaql_core::TraceNode>) -> Result<(), RepositoryError<E::Error>> {
843 if !node.relations.is_empty() {
844 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
845 "remove node {} cannot contain child relations",
846 node.entity
847 ))));
848 }
849 let descriptor = self
850 .repository
851 .metadata
852 .context
853 .require_entity(&node.entity)
854 .map_err(RepositoryError::Runtime)?;
855 let id_property = descriptor.id_property().ok_or_else(|| {
856 RepositoryError::Runtime(RuntimeError::Graph(format!(
857 "entity {} has no id property for graph remove",
858 node.entity
859 )))
860 })?;
861 let id = node
862 .values
863 .get(&id_property.name)
864 .filter(|value| !is_unassigned_id_value(value))
865 .cloned()
866 .ok_or_else(|| {
867 RepositoryError::Runtime(RuntimeError::Graph(format!(
868 "remove node {} missing id property {}",
869 node.entity, id_property.name
870 )))
871 })?;
872 let current = self
873 .fetch_graph_current_row(&node.entity, &id_property.name, &id, trace_chain).await?
874 .ok_or_else(|| {
875 RepositoryError::Runtime(RuntimeError::Graph(format!(
876 "remove node {}({}) does not exist",
877 node.entity,
878 graph_identity_key(&id)
879 )))
880 })?;
881 if let Some(version_property) = descriptor.version_property() {
882 if let Some(Value::I64(existing_version)) = current.get(&version_property.name) {
883 if *existing_version < 0 {
884 return Err(RepositoryError::Runtime(RuntimeError::Graph(format!(
885 "remove node {}({}) is already deleted",
886 node.entity,
887 graph_identity_key(&id)
888 ))));
889 }
890 }
891 }
892 Ok(())
893 }
894
895 fn graph_node_from_record(
896 &self,
897 entity: &str,
898 record: Record,
899 ) -> Result<GraphNode, RuntimeError> {
900 let descriptor = self.repository.metadata.context.require_entity(entity)?;
901 let mut node = GraphNode::new(entity);
902
903 for (field, value) in record {
904 if field == "_comment" {
905 if let Value::Text(comment) = value {
906 node.set_comment(comment);
907 }
908 continue;
909 }
910 if field == "_dirty_fields" {
911 if let Value::List(fields) = value {
912 let mut dirty = std::collections::BTreeSet::new();
913 for f in fields {
914 if let Value::Text(t) = f {
915 dirty.insert(t);
916 }
917 }
918 node.dirty_fields = Some(dirty);
919 }
920 continue;
921 }
922 if field == "_original_values" {
923 if let Value::Object(orig) = value {
924 node.original_values = Some(orig);
925 }
926 continue;
927 }
928 let Some(relation) = descriptor.relation_by_name(&field) else {
929 node.values.insert(field, value);
930 continue;
931 };
932
933 match value {
934 Value::Null => {
935 node.relations.entry(field).or_default();
936 }
937 Value::Object(record) => {
938 let child = self.graph_node_from_record(&relation.target_entity, record)?;
939 node.relations.entry(field).or_default().push(child);
940 }
941 Value::List(values) => {
942 let children = node.relations.entry(field.clone()).or_default();
943 for value in values {
944 let Value::Object(record) = value else {
945 return Err(RuntimeError::Graph(format!(
946 "relation {}.{} expects object children, got {:?}",
947 entity, field, value
948 )));
949 };
950 children
951 .push(self.graph_node_from_record(&relation.target_entity, record)?);
952 }
953 }
954 other => {
955 return Err(RuntimeError::Graph(format!(
956 "relation {}.{} expects object/list/null, got {:?}",
957 entity, field, other
958 )));
959 }
960 }
961 }
962
963 Ok(node)
964 }
965
966 fn graph_update_command(
967 &self,
968 node: &mut GraphNode,
969 descriptor: &EntityDescriptor,
970 id_property: &PropertyDescriptor,
971 id: &Value,
972 ) -> Result<UpdateCommand, RepositoryError<E::Error>> {
973 crate::mark_record_status(&mut node.values, crate::CheckObjectStatus::Update);
974 let check_result = self
975 .repository
976 .metadata
977 .context
978 .check_and_fix_record(&node.entity, &mut node.values);
979 crate::clear_record_status(&mut node.values);
980 check_result.map_err(RepositoryError::Runtime)?;
981
982 let mut command = UpdateCommand::new(node.entity.clone(), id.clone());
983 command.old_values = node.original_values.clone();
984 if let Some(version_property) = descriptor.version_property() {
985 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
986 command = command.expected_version(*version);
987 }
988 }
989 for property in descriptor.properties.iter().filter(|property| {
993 !property.is_id
994 && !property.is_version
995 && property.name != id_property.name
996 && match &node.dirty_fields {
997 Some(dirty) => dirty.contains(&property.name),
998 None => node.values.contains_key(&property.name),
999 }
1000 }) {
1001 if let Some(value) = node.values.get(&property.name) {
1002 command.values.insert(property.name.clone(), value.clone());
1003 }
1004 }
1005 Ok(command)
1006 }
1007
1008 fn delete_graph_node<'b, 's: 'b>(
1009 &'b self,
1010 node: &'b GraphNode,
1011 parent_scope: Option<&'s ScopedCommentNode<'s>>,
1012 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<u64, RepositoryError<E::Error>>> + Send + '_>> {
1013 Box::pin(async move {
1014 let descriptor = self
1015 .repository
1016 .metadata
1017 .context
1018 .require_entity(&node.entity)
1019 .map_err(RepositoryError::Runtime)?;
1020 let id_property = descriptor.id_property().ok_or_else(|| {
1021 RepositoryError::Runtime(RuntimeError::Graph(format!(
1022 "entity {} has no id property for graph remove",
1023 node.entity
1024 )))
1025 })?;
1026 let id = node
1027 .values
1028 .get(&id_property.name)
1029 .filter(|value| !is_unassigned_id_value(value))
1030 .cloned()
1031 .ok_or_else(|| {
1032 RepositoryError::Runtime(RuntimeError::Graph(format!(
1033 "remove node {} missing id property {}",
1034 node.entity, id_property.name
1035 )))
1036 })?;
1037 let mut delete = DeleteCommand::new(node.entity.clone(), id);
1038 if let Some(version_property) = descriptor.version_property() {
1039 if let Some(Value::I64(version)) = node.values.get(&version_property.name) {
1040 delete = delete.expected_version(*version);
1041 }
1042 }
1043
1044 let current_scope = node.comment.as_ref().map(|c| ScopedCommentNode {
1046 parent: parent_scope,
1047 track: teaql_core::TraceNode {
1048 entity_type: node.entity.clone(),
1049 entity_id: node
1050 .id()
1051 .and_then(|v| match v {
1052 Value::U64(n) => Some(*n),
1053 Value::I64(n) => Some(*n as u64),
1054 _ => None,
1055 }),
1056 comment: c.clone(),
1057 },
1058 });
1059 let active_scope = current_scope.as_ref().or(parent_scope);
1060 let lineage = active_scope.map(|s| s.to_trace_chain()).unwrap_or_default();
1061
1062 self.delete_scoped(&delete, lineage).await
1063 })
1064 }
1065
1066
1067 async fn fetch_graph_children(
1068 &self,
1069 entity: &str,
1070 foreign_key: &str,
1071 parent_value: &Value,
1072 trace_chain: Vec<teaql_core::TraceNode>,
1073 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
1074 let mut query = SelectQuery::new(entity).filter(Expr::eq(foreign_key, parent_value.clone()));
1075 query.trace_chain = trace_chain;
1076 self.scoped_repository(entity.to_owned()).fetch_all(&query).await
1077 }
1078 pub async fn fetch_graph_current_row(
1079 &self,
1080 entity: &str,
1081 id_property: &str,
1082 id: &teaql_core::Value,
1083 trace_chain: Vec<teaql_core::TraceNode>,
1084 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
1085 let mut query = teaql_core::SelectQuery::new(entity).filter(teaql_core::Expr::eq(id_property, id.clone()));
1086 query.trace_chain = trace_chain;
1087 let mut rows = self
1088 .scoped_repository(entity.to_owned())
1089 .fetch_all(&query).await?;
1090 Ok(rows.pop())
1091 }
1092
1093 pub async fn execute_ledger_plan(&self, root: crate::EntityRoot) -> Result<(), RepositoryError<E::Error>> {
1094 let comment = root.get_comment();
1095 let trace_chain = comment.map(|c| vec![teaql_core::TraceNode {
1096 entity_type: self.entity.clone(),
1097 entity_id: None,
1098 comment: c,
1099 }]).unwrap_or_default();
1100
1101 let deleted_keys = root.deleted_keys();
1102 let new_keys = root.new_keys();
1103 let change_set = root.current_change_set();
1104
1105 for key in deleted_keys.iter() {
1107 let id = key.id.clone();
1108 let mut cmd = teaql_core::DeleteCommand::new(&key.entity, id);
1109 if let Some(version) = root.get_original_version(key) {
1110 cmd = cmd.expected_version(version);
1111 }
1112 let t = root.get_trace_chain(key);
1113 cmd.trace_chain = if t.is_empty() { trace_chain.clone() } else { t };
1114 self.delete(&cmd).await?;
1115 }
1116
1117 let mut update_batches: std::collections::BTreeMap<(String, String), Vec<crate::EntityKey>> = std::collections::BTreeMap::new();
1119 let mut insert_batches: std::collections::BTreeMap<String, Vec<crate::EntityKey>> = std::collections::BTreeMap::new();
1120
1121 for (key, record) in change_set.changes() {
1122 if deleted_keys.contains(key) {
1123 continue;
1124 }
1125 let mut is_new = new_keys.contains(key);
1126
1127 if !is_new {
1128 let descriptor = self.repository.metadata.context.require_entity(&key.entity).map_err(RepositoryError::Runtime)?;
1129 let id_property = descriptor.id_property().ok_or_else(|| {
1130 RepositoryError::Runtime(RuntimeError::Graph(format!("entity {} has no id property", key.entity)))
1131 })?;
1132 let t = root.get_trace_chain(key);
1133 let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1134 let current_row = self.fetch_graph_current_row(&key.entity, &id_property.name, &key.id, my_trace).await?;
1135 if current_row.is_none() {
1136 is_new = true;
1137 }
1138 }
1139
1140 if is_new {
1141 insert_batches.entry(key.entity.clone()).or_default().push(key.clone());
1142 } else {
1143 let mut fields: Vec<String> = record.keys().cloned().collect();
1144 fields.sort();
1145 let signature = fields.join(",");
1146 update_batches.entry((key.entity.clone(), signature)).or_default().push(key.clone());
1147 }
1148 }
1149
1150 let mut insert_order: Vec<String> = insert_batches.keys().cloned().collect();
1151 insert_order.sort();
1152 println!("execute_ledger_plan: insert_batches={:?}", insert_order);
1153
1154 for entity in insert_order {
1155 let keys = insert_batches.get(&entity).unwrap();
1156 let descriptor = self.repository.metadata.context.require_entity(&entity).map_err(RepositoryError::Runtime)?;
1157 let mut cmd = teaql_core::BatchInsertCommand::new(&descriptor.table_name);
1158 let mut traces = Vec::new();
1159 for key in keys {
1160 let record = change_set.changes().get(key).unwrap();
1161 let mut db_record = Record::new();
1162 db_record.insert("id".to_owned(), key.id.clone());
1163 for (field, value) in record {
1164 db_record.insert(field.clone(), value.clone());
1165 }
1166 crate::repository::helpers::ensure_initial_version(&mut db_record, descriptor);
1167 cmd.batch_values.push(db_record);
1168 let t = root.get_trace_chain(key);
1169 let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1170 traces.push(my_trace);
1171 }
1172 cmd.trace_chains = traces;
1173 self.execute_prepared_batch_insert(cmd).await?;
1174 }
1175
1176 let mut update_order: Vec<(String, String)> = update_batches.keys().cloned().collect();
1177 update_order.sort();
1178 println!("execute_ledger_plan: update_batches={:?}", update_order);
1179
1180 for signature in update_order {
1181 let keys = update_batches.get(&signature).unwrap();
1182 let descriptor = self.repository.metadata.context.require_entity(&signature.0).map_err(RepositoryError::Runtime)?;
1183 let mut update_fields: Vec<String> = signature.0.split(',').map(|s| s.to_string()).collect();
1184 let mut cmd = teaql_core::BatchUpdateCommand::new(&descriptor.table_name, update_fields);
1185 let mut traces = Vec::new();
1186 for key in keys {
1187 let record = change_set.changes().get(key).unwrap();
1188 let mut db_record = Record::new();
1189 db_record.insert("id".to_owned(), key.id.clone());
1190 for (field, value) in record {
1191 db_record.insert(field.clone(), value.clone());
1192 }
1193 crate::repository::helpers::increment_version(&mut db_record, descriptor, root.get_original_version(key));
1194 cmd.batch_values.push(db_record);
1195 let t = root.get_trace_chain(key);
1196 let my_trace = if t.is_empty() { trace_chain.clone() } else { t };
1197 traces.push(my_trace);
1198 }
1199 cmd.trace_chains = traces;
1200 self.execute_prepared_batch_update(cmd).await?;
1201 }
1202
1203 Ok(())
1204 }
1205}