1mod checker;
2mod context;
3mod error;
4mod event;
5mod graph;
6mod id;
7mod language;
8mod memory;
9mod registry;
10mod repository;
11
12pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
13pub use error::{ContextError, RepositoryError, RuntimeError};
14pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
15pub use graph::{
16 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
17 GraphOperation, sorted_update_fields,
18};
19pub(crate) use id::local_id_generator;
20pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
21pub use language::{
22 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
23};
24pub use memory::{MemoryRepository, MemoryRepositoryError};
25pub use registry::{
26 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
27 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
28 RuntimeModule,
29};
30pub use repository::{
31 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
32 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
33};
34
35#[cfg(feature = "sqlx")]
36pub mod sqlx_support;
37
38#[cfg(test)]
39mod tests {
40 use std::collections::{BTreeMap, VecDeque};
41 use std::sync::{Arc, Mutex};
42
43 use super::{
44 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
45 Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
46 GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
47 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
48 InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
49 QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
50 RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
51 };
52 use teaql_core::{
53 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
54 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
55 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
56 };
57 use teaql_dialect_pg::PostgresDialect;
58 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
59 use teaql_sql::CompiledQuery;
60
61 fn entity() -> EntityDescriptor {
62 EntityDescriptor::new("Order")
63 .table_name("orders")
64 .property(
65 PropertyDescriptor::new("id", DataType::U64)
66 .column_name("id")
67 .id()
68 .not_null(),
69 )
70 .property(
71 PropertyDescriptor::new("version", DataType::I64)
72 .column_name("version")
73 .version()
74 .not_null(),
75 )
76 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
77 .relation(
78 teaql_core::RelationDescriptor::new("lines", "OrderLine")
79 .local_key("id")
80 .foreign_key("order_id")
81 .many(),
82 )
83 }
84
85 fn line_entity() -> EntityDescriptor {
86 EntityDescriptor::new("OrderLine")
87 .table_name("orderline")
88 .property(
89 PropertyDescriptor::new("id", DataType::U64)
90 .column_name("id")
91 .id()
92 .not_null(),
93 )
94 .property(
95 PropertyDescriptor::new("version", DataType::I64)
96 .column_name("version")
97 .version(),
98 )
99 .property(
100 PropertyDescriptor::new("order_id", DataType::U64)
101 .column_name("order_id")
102 .not_null(),
103 )
104 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
105 .property(
106 PropertyDescriptor::new("product_id", DataType::U64)
107 .column_name("product_id")
108 .not_null(),
109 )
110 .relation(
111 teaql_core::RelationDescriptor::new("product", "Product")
112 .local_key("product_id")
113 .foreign_key("id"),
114 )
115 }
116
117 fn product_entity() -> EntityDescriptor {
118 EntityDescriptor::new("Product")
119 .table_name("product")
120 .property(
121 PropertyDescriptor::new("id", DataType::U64)
122 .column_name("id")
123 .id()
124 .not_null(),
125 )
126 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
127 }
128
129 #[derive(Debug, Default)]
130 struct StubExecutor {
131 affected: u64,
132 rows: Vec<Record>,
133 }
134
135 #[derive(Debug, Default)]
136 struct QueueExecutor {
137 affected: u64,
138 rows: Mutex<VecDeque<Vec<Record>>>,
139 queries: Mutex<Vec<String>>,
140 }
141
142 struct OrderBehavior;
143
144 #[allow(dead_code)]
145 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
146 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
147 struct CatalogProductRow {
148 #[teaql(id)]
149 id: u64,
150 name: String,
151 }
152
153 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
154 #[teaql(entity = "OrderAggregate", table = "orders")]
155 struct OrderAggregateDynamic {
156 #[teaql(id)]
157 id: u64,
158 #[teaql(dynamic)]
159 dynamic: BTreeMap<String, Value>,
160 }
161
162 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
163 #[teaql(entity = "Product", table = "product")]
164 struct ProductEntityRow {
165 #[teaql(id)]
166 id: u64,
167 name: String,
168 }
169
170 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
171 #[teaql(entity = "OrderLine", table = "orderline")]
172 struct OrderLineEntityRow {
173 #[teaql(id)]
174 id: u64,
175 #[teaql(column = "order_id")]
176 order_id: u64,
177 name: String,
178 #[teaql(column = "product_id")]
179 product_id: u64,
180 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
181 product: Option<ProductEntityRow>,
182 }
183
184 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
185 #[teaql(entity = "Order", table = "orders")]
186 struct OrderAggregateRow {
187 #[teaql(id)]
188 id: u64,
189 #[teaql(version)]
190 version: i64,
191 name: String,
192 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
193 lines: teaql_core::SmartList<OrderLineEntityRow>,
194 }
195
196 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
197 #[teaql(entity = "Order", table = "orders")]
198 struct Order {
199 #[teaql(id)]
200 id: u64,
201 #[teaql(version)]
202 version: i64,
203 name: String,
204 }
205
206 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
207 #[teaql(entity = "Product", table = "product")]
208 struct TypedGraphProduct {
209 #[teaql(id)]
210 id: Option<u64>,
211 name: String,
212 }
213
214 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
215 #[teaql(entity = "OrderLine", table = "orderline")]
216 struct TypedGraphLine {
217 #[teaql(id)]
218 id: Option<u64>,
219 #[teaql(column = "order_id")]
220 order_id: Option<u64>,
221 name: String,
222 #[teaql(column = "product_id")]
223 product_id: Option<u64>,
224 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
225 product: Option<TypedGraphProduct>,
226 }
227
228 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
229 #[teaql(entity = "Order", table = "orders")]
230 struct TypedGraphOrder {
231 #[teaql(id)]
232 id: Option<u64>,
233 #[teaql(version)]
234 version: i64,
235 name: String,
236 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
237 lines: teaql_core::SmartList<TypedGraphLine>,
238 }
239
240 #[derive(Debug, PartialEq, Eq)]
241 struct OrderEntity {
242 id: u64,
243 version: i64,
244 name: String,
245 }
246
247 impl teaql_core::TeaqlEntity for OrderEntity {
248 fn entity_descriptor() -> EntityDescriptor {
249 entity()
250 }
251 }
252
253 impl Entity for OrderEntity {
254 fn from_record(record: Record) -> Result<Self, EntityError> {
255 let id = match record.get("id") {
256 Some(Value::U64(v)) => *v,
257 Some(Value::I64(v)) if *v >= 0 => *v as u64,
258 other => {
259 return Err(EntityError::new(
260 "Order",
261 format!("invalid id field: {other:?}"),
262 ));
263 }
264 };
265 let version = match record.get("version") {
266 Some(Value::I64(v)) => *v,
267 other => {
268 return Err(EntityError::new(
269 "Order",
270 format!("invalid version field: {other:?}"),
271 ));
272 }
273 };
274 let name = match record.get("name") {
275 Some(Value::Text(v)) => v.clone(),
276 other => {
277 return Err(EntityError::new(
278 "Order",
279 format!("invalid name field: {other:?}"),
280 ));
281 }
282 };
283 Ok(Self { id, version, name })
284 }
285
286 fn into_record(self) -> Record {
287 Record::from([
288 (String::from("id"), Value::U64(self.id)),
289 (String::from("version"), Value::I64(self.version)),
290 (String::from("name"), Value::Text(self.name)),
291 ])
292 }
293 }
294
295 #[derive(Debug)]
296 struct StubError;
297
298 impl std::fmt::Display for StubError {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 write!(f, "stub error")
301 }
302 }
303
304 impl std::error::Error for StubError {}
305
306 impl QueryExecutor for StubExecutor {
307 type Error = StubError;
308
309 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
310 Ok(self.rows.clone())
311 }
312
313 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
314 Ok(self.affected)
315 }
316
317 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
318 Ok(GraphTransactionBoundary::Started)
319 }
320 }
321
322 impl QueryExecutor for QueueExecutor {
323 type Error = StubError;
324
325 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
326 self.queries.lock().unwrap().push(query.sql.clone());
327 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
328 }
329
330 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
331 Ok(self.affected)
332 }
333 }
334
335 #[test]
336 fn user_context_records_configured_sql_logs() {
337 let mut ctx = UserContext::new()
338 .with_module(crate::module!(Order))
339 .with_sql_log_options(SqlLogOptions::select_only());
340 ctx.insert_resource(PostgresDialect);
341 ctx.insert_resource(StubExecutor {
342 affected: 1,
343 rows: Vec::new(),
344 });
345
346 {
347 let repo = ctx
348 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
349 .unwrap();
350 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
351 .unwrap();
352 repo.insert(&InsertCommand::new("Order").value("name", "created"))
353 .unwrap();
354 }
355
356 let logs = ctx.sql_logs();
357 assert_eq!(logs.len(), 1);
358 assert_eq!(logs[0].operation, SqlLogOperation::Select);
359 assert_eq!(
360 logs[0].debug_sql,
361 "SELECT * FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
362 );
363
364 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
365 ctx.clear_sql_logs();
366 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
367 .unwrap()
368 .update(
369 &UpdateCommand::new("Order", 1_u64)
370 .value("name", "updated")
371 .expected_version(1),
372 )
373 .unwrap();
374
375 let logs = ctx.sql_logs();
376 assert_eq!(logs.len(), 1);
377 assert_eq!(logs[0].operation, SqlLogOperation::Update);
378 assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
379 assert!(logs[0].debug_sql.contains("'updated'"));
380 }
381
382 impl RepositoryBehavior for OrderBehavior {
383 fn before_select(
384 &self,
385 _ctx: &UserContext,
386 query: &mut teaql_core::SelectQuery,
387 ) -> Result<(), RuntimeError> {
388 query.filter = Some(Expr::eq("version", 1_i64));
389 Ok(())
390 }
391
392 fn before_insert(
393 &self,
394 _ctx: &UserContext,
395 command: &mut InsertCommand,
396 ) -> Result<(), RuntimeError> {
397 command
398 .values
399 .entry("version".to_owned())
400 .or_insert(Value::I64(1));
401 Ok(())
402 }
403
404 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
405 vec!["lines".to_owned()]
406 }
407 }
408
409 struct ContextAwareOrderBehavior;
410 struct OrderChecker;
411 #[derive(Clone)]
412 struct RecordingEventSink {
413 events: Arc<Mutex<Vec<EntityEvent>>>,
414 }
415
416 impl RepositoryBehavior for ContextAwareOrderBehavior {
417 fn before_insert(
418 &self,
419 ctx: &UserContext,
420 command: &mut InsertCommand,
421 ) -> Result<(), RuntimeError> {
422 let tenant = ctx
423 .get_named_resource::<String>("tenant")
424 .cloned()
425 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
426 let version = *ctx
427 .get_named_resource::<i64>("initial_version")
428 .ok_or_else(|| {
429 RuntimeError::Behavior("missing initial_version resource".to_owned())
430 })?;
431 let trace_id = match ctx.local("trace_id") {
432 Some(Value::Text(value)) => value.clone(),
433 other => {
434 return Err(RuntimeError::Behavior(format!(
435 "missing trace_id local, got {other:?}"
436 )));
437 }
438 };
439
440 command
441 .values
442 .entry("name".to_owned())
443 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
444 command
445 .values
446 .entry("version".to_owned())
447 .or_insert(Value::I64(version));
448 Ok(())
449 }
450 }
451
452 impl Checker for OrderChecker {
453 fn entity(&self) -> &str {
454 "Order"
455 }
456
457 fn check_and_fix(
458 &self,
459 _ctx: &UserContext,
460 record: &mut Record,
461 location: &ObjectLocation,
462 results: &mut CheckResults,
463 ) {
464 let status = CheckObjectStatus::from_record(record);
465 if status.is_create() {
466 self.required(record, "name", location, results);
467 record.entry("version".to_owned()).or_insert(Value::I64(1));
468 }
469 if status.is_update()
470 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
471 {
472 record.insert(
473 "name".to_owned(),
474 Value::Text("graph-update-checked".to_owned()),
475 );
476 }
477 self.min_string_length(record, "name", 3, location, results);
478 }
479 }
480
481 impl EntityEventSink for RecordingEventSink {
482 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
483 self.events.lock().unwrap().push(event.clone());
484 Ok(())
485 }
486 }
487
488 struct FixedIdGenerator(u64);
489
490 impl InternalIdGenerator for FixedIdGenerator {
491 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
492 Ok(self.0)
493 }
494 }
495
496 struct SequentialIdGenerator {
497 next: Mutex<u64>,
498 }
499
500 impl SequentialIdGenerator {
501 fn new(next: u64) -> Self {
502 Self {
503 next: Mutex::new(next),
504 }
505 }
506 }
507
508 impl InternalIdGenerator for SequentialIdGenerator {
509 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
510 let mut next = self
511 .next
512 .lock()
513 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
514 let id = *next;
515 *next += 1;
516 Ok(id)
517 }
518 }
519
520 #[test]
521 fn metadata_store_registers_entities() {
522 let store = InMemoryMetadataStore::new().with_entity(entity());
523 assert!(store.entity("Order").is_some());
524 }
525
526 #[test]
527 fn runtime_module_registers_descriptor_into_context() {
528 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
529 assert!(ctx.entity("Order").is_some());
530 assert!(ctx.has_repository("Order"));
531 }
532
533 #[test]
534 fn runtime_module_registers_derived_entity_and_behavior() {
535 let ctx = UserContext::new().with_module(
536 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
537 );
538 assert!(ctx.entity("CatalogProduct").is_some());
539 assert!(ctx.has_repository("CatalogProduct"));
540 assert!(ctx.repository_behavior("CatalogProduct").is_some());
541 }
542
543 #[test]
544 fn module_macro_registers_multiple_entities() {
545 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
546 assert!(ctx.entity("CatalogProduct").is_some());
547 assert!(ctx.has_repository("CatalogProduct"));
548 }
549
550 #[test]
551 fn module_macro_registers_entity_behavior_pairs() {
552 let ctx =
553 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
554 assert!(ctx.entity("CatalogProduct").is_some());
555 assert!(ctx.repository_behavior("CatalogProduct").is_some());
556 }
557
558 #[test]
559 fn repository_returns_optimistic_lock_conflict() {
560 let store = InMemoryMetadataStore::new().with_entity(entity());
561 let executor = StubExecutor {
562 affected: 0,
563 rows: Vec::new(),
564 };
565 let repo = Repository::new(&PostgresDialect, &store, &executor);
566
567 let err = repo
568 .update(
569 &UpdateCommand::new("Order", 1_u64)
570 .expected_version(3)
571 .value("name", "next"),
572 )
573 .unwrap_err();
574
575 match err {
576 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
577 other => panic!("unexpected error: {other}"),
578 }
579 }
580
581 #[test]
582 fn user_context_indexes_resources_and_locals() {
583 let mut ctx =
584 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
585 ctx.insert_resource::<u64>(42);
586 ctx.insert_named_resource("tenant", String::from("acme"));
587 ctx.put_local("trace_id", "req-1");
588
589 assert!(ctx.entity("Order").is_some());
590 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
591 assert_eq!(
592 ctx.get_named_resource::<String>("tenant"),
593 Some(&String::from("acme"))
594 );
595 assert_eq!(
596 ctx.local("trace_id"),
597 Some(&Value::Text("req-1".to_owned()))
598 );
599 }
600
601 #[test]
602 fn user_context_builds_context_repository() {
603 let mut ctx =
604 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
605 ctx.insert_resource(PostgresDialect);
606 ctx.insert_resource(StubExecutor {
607 affected: 1,
608 rows: Vec::new(),
609 });
610
611 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
612 let affected = repo
613 .update(
614 &UpdateCommand::new("Order", 1_u64)
615 .expected_version(3)
616 .value("name", "next"),
617 )
618 .unwrap();
619
620 assert_eq!(affected, 1);
621 }
622
623 #[test]
624 fn user_context_resolves_repository_by_entity_type() {
625 let mut ctx = UserContext::new()
626 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
627 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
628 ctx.insert_resource(PostgresDialect);
629 ctx.insert_resource(StubExecutor {
630 affected: 1,
631 rows: Vec::new(),
632 });
633
634 let repo = ctx
635 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
636 .unwrap();
637 assert_eq!(repo.entity(), "Order");
638 assert_eq!(repo.select().entity, "Order");
639
640 let affected = repo
641 .insert(
642 &repo
643 .insert_command()
644 .value("id", 1_u64)
645 .value("version", 1_i64)
646 .value("name", "n"),
647 )
648 .unwrap();
649 assert_eq!(affected, 1);
650 }
651
652 #[test]
653 fn resolved_repository_applies_behavior_hooks() {
654 let mut ctx = UserContext::new()
655 .with_metadata(
656 InMemoryMetadataStore::new()
657 .with_entity(entity())
658 .with_entity(line_entity())
659 .with_entity(product_entity()),
660 )
661 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
662 .with_repository_behavior_registry(
663 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
664 );
665 ctx.insert_resource(PostgresDialect);
666 ctx.insert_resource(StubExecutor {
667 affected: 1,
668 rows: Vec::new(),
669 });
670
671 let repo = ctx
672 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
673 .unwrap();
674
675 let compiled = repo.compile(&repo.select()).unwrap();
676 assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
677
678 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
679 let affected = repo.insert(&insert).unwrap();
680 assert_eq!(affected, 1);
681 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
682 }
683
684 #[test]
685 fn resolved_repository_prepares_insert_command_with_generated_id() {
686 let mut ctx = UserContext::new()
687 .with_metadata(
688 InMemoryMetadataStore::new()
689 .with_entity(entity())
690 .with_entity(line_entity())
691 .with_entity(product_entity()),
692 )
693 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
694 .with_repository_behavior_registry(
695 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
696 )
697 .with_internal_id_generator(FixedIdGenerator(42));
698 ctx.insert_resource(PostgresDialect);
699 ctx.insert_resource(StubExecutor {
700 affected: 1,
701 rows: Vec::new(),
702 });
703
704 let repo = ctx
705 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
706 .unwrap();
707
708 let prepared = repo
709 .prepare_insert_command(&repo.insert_command().value("name", "n"))
710 .unwrap();
711
712 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
713 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
714 assert_eq!(
715 prepared.values.get("name"),
716 Some(&Value::Text("n".to_owned()))
717 );
718 }
719
720 #[test]
721 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
722 let mut ctx = UserContext::new()
723 .with_metadata(
724 InMemoryMetadataStore::new()
725 .with_entity(entity())
726 .with_entity(line_entity())
727 .with_entity(product_entity()),
728 )
729 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
730 .with_repository_behavior_registry(
731 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
732 )
733 .with_internal_id_generator(SequentialIdGenerator::new(500));
734 ctx.insert_resource(PostgresDialect);
735 ctx.insert_resource(StubExecutor {
736 affected: 1,
737 rows: Vec::new(),
738 });
739
740 let repo = ctx
741 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
742 .unwrap();
743 let graph = GraphNode::new("Order").value("name", "root").relation(
744 "lines",
745 GraphNode::new("OrderLine")
746 .value("name", "line-1")
747 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
748 );
749
750 let saved = repo.save_graph(graph).unwrap();
751
752 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
753 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
754 let lines = saved.relations.get("lines").unwrap();
755 assert_eq!(lines.len(), 1);
756 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
757 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
758 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
759 let product = lines[0].relations.get("product").unwrap();
760 assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
761 }
762
763 #[test]
764 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
765 let mut ctx = UserContext::new()
766 .with_metadata(
767 InMemoryMetadataStore::new()
768 .with_entity(entity())
769 .with_entity(line_entity())
770 .with_entity(product_entity()),
771 )
772 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
773 .with_internal_id_generator(SequentialIdGenerator::new(700));
774 ctx.insert_resource(PostgresDialect);
775 ctx.insert_resource(StubExecutor {
776 affected: 1,
777 rows: Vec::new(),
778 });
779
780 let repo = ctx
781 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
782 .unwrap();
783 let order = TypedGraphOrder {
784 id: None,
785 version: 1,
786 name: "typed-root".to_owned(),
787 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
788 id: None,
789 order_id: None,
790 name: "typed-line".to_owned(),
791 product_id: None,
792 product: Some(TypedGraphProduct {
793 id: None,
794 name: "typed-product".to_owned(),
795 }),
796 }]),
797 };
798
799 let extracted = repo.graph_node_from_entity(order).unwrap();
800 assert_eq!(extracted.entity, "Order");
801 assert_eq!(
802 extracted.values.get("name"),
803 Some(&Value::Text("typed-root".to_owned()))
804 );
805 assert_eq!(extracted.values.get("id"), Some(&Value::Null));
806 assert_eq!(extracted.relations["lines"].len(), 1);
807 assert_eq!(
808 extracted.relations["lines"][0].values.get("name"),
809 Some(&Value::Text("typed-line".to_owned()))
810 );
811 assert_eq!(
812 extracted.relations["lines"][0].relations["product"].len(),
813 1
814 );
815
816 let saved = repo.save_graph(extracted).unwrap();
817 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
818 let lines = saved.relations.get("lines").unwrap();
819 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
820 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
821 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
822 assert_eq!(
823 lines[0].relations["product"][0].values.get("id"),
824 Some(&Value::U64(702))
825 );
826 }
827
828 #[test]
829 fn resolved_repository_saves_typed_entity_graph_directly() {
830 let mut ctx = UserContext::new()
831 .with_metadata(
832 InMemoryMetadataStore::new()
833 .with_entity(entity())
834 .with_entity(line_entity())
835 .with_entity(product_entity()),
836 )
837 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
838 .with_internal_id_generator(SequentialIdGenerator::new(800));
839 ctx.insert_resource(PostgresDialect);
840 ctx.insert_resource(StubExecutor {
841 affected: 1,
842 rows: Vec::new(),
843 });
844
845 let repo = ctx
846 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
847 .unwrap();
848 let saved = repo
849 .save_entity_graph(TypedGraphOrder {
850 id: None,
851 version: 1,
852 name: "typed-direct".to_owned(),
853 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
854 id: None,
855 order_id: None,
856 name: "typed-line".to_owned(),
857 product_id: None,
858 product: Some(TypedGraphProduct {
859 id: None,
860 name: "typed-product".to_owned(),
861 }),
862 }]),
863 })
864 .unwrap();
865
866 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
867 assert_eq!(
868 saved.relations["lines"][0].values.get("order_id"),
869 Some(&Value::U64(800))
870 );
871 assert_eq!(
872 saved.relations["lines"][0].values.get("product_id"),
873 Some(&Value::U64(802))
874 );
875 }
876
877 #[test]
878 fn custom_user_context_can_drive_insert_preparation() {
879 let mut ctx = UserContext::new()
880 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
881 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
882 .with_repository_behavior_registry(
883 InMemoryRepositoryBehaviorRegistry::new()
884 .with_behavior("Order", ContextAwareOrderBehavior),
885 )
886 .with_internal_id_generator(FixedIdGenerator(99));
887 ctx.insert_named_resource("tenant", String::from("acme"));
888 ctx.insert_named_resource("initial_version", 7_i64);
889 ctx.put_local("trace_id", "req-9");
890 ctx.insert_resource(PostgresDialect);
891 ctx.insert_resource(StubExecutor {
892 affected: 1,
893 rows: Vec::new(),
894 });
895
896 let repo = ctx
897 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
898 .unwrap();
899 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
900
901 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
902 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
903 assert_eq!(
904 prepared.values.get("name"),
905 Some(&Value::Text("acme:req-9".to_owned()))
906 );
907 }
908
909 #[test]
910 fn checker_registry_validates_and_fixes_insert_commands() {
911 let mut ctx = UserContext::new()
912 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
913 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
914 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
915 .with_internal_id_generator(FixedIdGenerator(77));
916 ctx.insert_resource(PostgresDialect);
917 ctx.insert_resource(StubExecutor {
918 affected: 1,
919 rows: Vec::new(),
920 });
921
922 let repo = ctx
923 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
924 .unwrap();
925 let prepared = repo
926 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
927 .unwrap();
928
929 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
930 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
931 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
932
933 let error = repo
934 .prepare_insert_command(&repo.insert_command().value("name", "no"))
935 .unwrap_err();
936 match error {
937 RuntimeError::Check(results) => {
938 assert_eq!(results.len(), 1);
939 assert_eq!(results[0].location.to_string(), "name");
940 }
941 other => panic!("unexpected checker error: {other:?}"),
942 }
943 }
944
945 #[test]
946 fn checker_registry_validates_update_commands_without_required_insert_checks() {
947 let mut ctx = UserContext::new()
948 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
949 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
950 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
951 ctx.insert_resource(PostgresDialect);
952 ctx.insert_resource(StubExecutor {
953 affected: 1,
954 rows: Vec::new(),
955 });
956
957 let repo = ctx
958 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
959 .unwrap();
960 repo.update(&repo.update_command(1_u64).value("version", 1_i64))
961 .unwrap();
962
963 let error = repo
964 .update(&repo.update_command(1_u64).value("name", "no"))
965 .unwrap_err();
966 match error {
967 RepositoryError::Runtime(RuntimeError::Check(results)) => {
968 assert_eq!(results.len(), 1);
969 assert_eq!(results[0].location.to_string(), "name");
970 }
971 other => panic!("unexpected checker error: {other:?}"),
972 }
973 }
974
975 #[test]
976 fn built_in_language_translators_cover_fifteen_languages() {
977 assert_eq!(Language::ALL.len(), 15);
978 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
979 let messages = Language::ALL
980 .iter()
981 .map(|language| translate_check_result(*language, &result))
982 .collect::<Vec<_>>();
983
984 assert!(messages.iter().all(|message| !message.is_empty()));
985 assert!(messages.iter().any(|message| message.contains("required")));
986 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
987 assert!(
988 messages
989 .iter()
990 .any(|message| message.contains("obligatoire"))
991 );
992 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
993 assert_eq!(
994 Language::from_code("zh-TW"),
995 Some(Language::TraditionalChinese)
996 );
997 }
998
999 #[test]
1000 fn user_context_language_switch_translates_checker_errors() {
1001 let mut ctx = UserContext::new()
1002 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1003 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1004 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1005 .with_internal_id_generator(FixedIdGenerator(77))
1006 .with_language(Language::Chinese);
1007 ctx.insert_resource(PostgresDialect);
1008 ctx.insert_resource(StubExecutor {
1009 affected: 1,
1010 rows: Vec::new(),
1011 });
1012
1013 let repo = ctx
1014 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1015 .unwrap();
1016 let error = repo
1017 .prepare_insert_command(&repo.insert_command())
1018 .unwrap_err();
1019 match error {
1020 RuntimeError::Check(results) => {
1021 assert_eq!(results.len(), 1);
1022 assert!(
1023 results[0]
1024 .message
1025 .as_ref()
1026 .is_some_and(|message| message.contains("å¿…å¡«"))
1027 );
1028 }
1029 other => panic!("unexpected checker error: {other:?}"),
1030 }
1031
1032 let mut ctx = UserContext::new().with_language(Language::English);
1033 ctx.set_language_code("es").unwrap();
1034 assert_eq!(ctx.language(), Language::Spanish);
1035 }
1036
1037 #[test]
1038 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1039 let mut ctx = UserContext::new()
1040 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1041 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1042 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1043 ctx.insert_resource(PostgresDialect);
1044 ctx.insert_resource(StubExecutor {
1045 affected: 1,
1046 rows: vec![Record::from([
1047 ("id".to_owned(), Value::U64(1)),
1048 ("version".to_owned(), Value::I64(1)),
1049 ("name".to_owned(), Value::Text("old".to_owned())),
1050 ])],
1051 });
1052
1053 let repo = ctx
1054 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1055 .unwrap();
1056 let saved = repo
1057 .save_graph(
1058 GraphNode::new("Order")
1059 .value("id", 1_u64)
1060 .value("version", 1_i64)
1061 .value("name", "graph-update"),
1062 )
1063 .unwrap();
1064
1065 assert_eq!(
1066 saved.values.get("name"),
1067 Some(&Value::Text("graph-update-checked".to_owned()))
1068 );
1069 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1070 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1071 }
1072
1073 #[test]
1074 fn user_context_event_sink_receives_repository_mutation_events() {
1075 let events = Arc::new(Mutex::new(Vec::new()));
1076 let mut ctx = UserContext::new()
1077 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1078 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1079 .with_internal_id_generator(FixedIdGenerator(88))
1080 .with_event_sink(RecordingEventSink {
1081 events: events.clone(),
1082 });
1083 ctx.insert_resource(PostgresDialect);
1084 ctx.insert_resource(StubExecutor {
1085 affected: 1,
1086 rows: Vec::new(),
1087 });
1088
1089 let repo = ctx
1090 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1091 .unwrap();
1092 repo.insert(&repo.insert_command().value("name", "created"))
1093 .unwrap();
1094 repo.update(
1095 &repo
1096 .update_command(88_u64)
1097 .expected_version(1)
1098 .value("name", "updated"),
1099 )
1100 .unwrap();
1101 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1102 .unwrap();
1103 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1104
1105 let events = events.lock().unwrap();
1106 assert_eq!(events.len(), 4);
1107 assert_eq!(events[0].kind, EntityEventKind::Created);
1108 assert_eq!(events[0].entity, "Order");
1109 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1110 assert_eq!(events[1].kind, EntityEventKind::Updated);
1111 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1112 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1113 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1114 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1115 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1116 }
1117
1118 #[test]
1119 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1120 let events = Arc::new(Mutex::new(Vec::new()));
1121 let mut ctx = UserContext::new()
1122 .with_metadata(
1123 InMemoryMetadataStore::new()
1124 .with_entity(entity())
1125 .with_entity(line_entity())
1126 .with_entity(product_entity()),
1127 )
1128 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1129 .with_event_sink(RecordingEventSink {
1130 events: events.clone(),
1131 });
1132 ctx.insert_resource(PostgresDialect);
1133 ctx.insert_resource(StubExecutor {
1134 affected: 1,
1135 rows: vec![Record::from([
1136 ("id".to_owned(), Value::U64(1)),
1137 ("version".to_owned(), Value::I64(1)),
1138 ("name".to_owned(), Value::Text("old".to_owned())),
1139 ])],
1140 });
1141
1142 let repo = ctx
1143 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1144 .unwrap();
1145 repo.save_graph(
1146 GraphNode::new("Order")
1147 .value("id", 1_u64)
1148 .value("version", 1_i64)
1149 .value("name", "updated")
1150 .relation(
1151 "lines",
1152 GraphNode::new("OrderLine")
1153 .value("name", "line")
1154 .value("product_id", 3_u64),
1155 ),
1156 )
1157 .unwrap();
1158
1159 let events = events.lock().unwrap();
1160 assert_eq!(events.len(), 3);
1161 assert_eq!(events[0].kind, EntityEventKind::Updated);
1162 assert_eq!(events[0].entity, "Order");
1163 assert_eq!(events[1].kind, EntityEventKind::Updated);
1164 assert_eq!(events[1].entity, "OrderLine");
1165 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1166 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1167 assert_eq!(events[2].entity, "OrderLine");
1168 }
1169
1170 #[test]
1171 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1172 let mut ctx = UserContext::new()
1173 .with_metadata(
1174 InMemoryMetadataStore::new()
1175 .with_entity(entity())
1176 .with_entity(line_entity())
1177 .with_entity(product_entity()),
1178 )
1179 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1180 .with_internal_id_generator(SequentialIdGenerator::new(500));
1181 ctx.insert_resource(PostgresDialect);
1182 ctx.insert_resource(StubExecutor {
1183 affected: 1,
1184 rows: vec![Record::from([
1185 ("id".to_owned(), Value::U64(1)),
1186 ("version".to_owned(), Value::I64(1)),
1187 ("name".to_owned(), Value::Text("old".to_owned())),
1188 ])],
1189 });
1190
1191 let plan = ctx
1192 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1193 GraphNode::new("Order")
1194 .value("id", 1_u64)
1195 .value("version", 1_i64)
1196 .value("name", "updated")
1197 .relation(
1198 "lines",
1199 GraphNode::new("OrderLine")
1200 .value("name", "new-line-a")
1201 .value("product_id", 2_u64),
1202 )
1203 .relation(
1204 "lines",
1205 GraphNode::new("OrderLine")
1206 .value("name", "new-line-b")
1207 .value("product_id", 2_u64),
1208 )
1209 .relation(
1210 "lines",
1211 GraphNode::new("OrderLine")
1212 .value("id", 5_u64)
1213 .value("version", 1_i64)
1214 .value("name", "same-update-a"),
1215 )
1216 .relation(
1217 "lines",
1218 GraphNode::new("OrderLine")
1219 .value("id", 6_u64)
1220 .value("version", 1_i64)
1221 .value("name", "same-update-b"),
1222 )
1223 .relation(
1224 "lines",
1225 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1226 )
1227 .relation(
1228 "lines",
1229 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1230 ),
1231 )
1232 .unwrap();
1233 let counts = plan.grouped_counts();
1234
1235 assert_eq!(
1236 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1237 Some(&1)
1238 );
1239 assert_eq!(
1240 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1241 Some(&2)
1242 );
1243 assert_eq!(
1244 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1245 Some(&2)
1246 );
1247 assert_eq!(
1248 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1249 Some(&1)
1250 );
1251 assert_eq!(
1252 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1253 Some(&1)
1254 );
1255 let create_batch = plan
1256 .batches
1257 .iter()
1258 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1259 .unwrap();
1260 assert_eq!(create_batch.items.len(), 2);
1261 assert_eq!(
1262 create_batch.items[0].values.get("id"),
1263 Some(&Value::U64(500))
1264 );
1265 assert_eq!(
1266 create_batch.items[1].values.get("id"),
1267 Some(&Value::U64(501))
1268 );
1269 let update_batch = plan
1270 .batches
1271 .iter()
1272 .find(|batch| {
1273 batch.entity == "OrderLine"
1274 && batch.kind == GraphMutationKind::Update
1275 && batch.update_fields == vec!["name".to_owned()]
1276 })
1277 .unwrap();
1278 assert_eq!(update_batch.items.len(), 2);
1279 }
1280
1281 #[test]
1282 fn resolved_repository_builds_relation_plans() {
1283 let mut ctx = UserContext::new()
1284 .with_metadata(
1285 InMemoryMetadataStore::new()
1286 .with_entity(entity())
1287 .with_entity(line_entity())
1288 .with_entity(product_entity()),
1289 )
1290 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1291 .with_repository_behavior_registry(
1292 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1293 );
1294 ctx.insert_resource(PostgresDialect);
1295 ctx.insert_resource(StubExecutor {
1296 affected: 1,
1297 rows: Vec::new(),
1298 });
1299
1300 let repo = ctx
1301 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1302 .unwrap();
1303 let plans = repo.relation_plans().unwrap();
1304
1305 assert_eq!(plans.len(), 1);
1306 assert_eq!(plans[0].relation_name, "lines");
1307 assert_eq!(plans[0].target_entity, "OrderLine");
1308 assert_eq!(plans[0].local_key, "id");
1309 assert_eq!(plans[0].foreign_key, "order_id");
1310 assert!(plans[0].many);
1311 }
1312
1313 #[test]
1314 fn resolved_repository_builds_relation_query_from_parent_rows() {
1315 let mut ctx = UserContext::new()
1316 .with_metadata(
1317 InMemoryMetadataStore::new()
1318 .with_entity(entity())
1319 .with_entity(line_entity())
1320 .with_entity(product_entity()),
1321 )
1322 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1323 .with_repository_behavior_registry(
1324 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1325 );
1326 ctx.insert_resource(PostgresDialect);
1327 ctx.insert_resource(StubExecutor {
1328 affected: 1,
1329 rows: Vec::new(),
1330 });
1331
1332 let repo = ctx
1333 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1334 .unwrap();
1335 let parent_rows = vec![
1336 Record::from([(String::from("id"), Value::U64(11))]),
1337 Record::from([(String::from("id"), Value::U64(12))]),
1338 ];
1339
1340 let query = repo.relation_query("lines", &parent_rows).unwrap();
1341 let compiled = repo.compile(&query).unwrap();
1342 assert!(compiled.sql.contains("FROM \"orderline\""));
1343 assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1344 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1345 }
1346
1347 #[test]
1348 fn resolved_repository_enhances_parent_rows_with_relations() {
1349 let mut ctx = UserContext::new()
1350 .with_metadata(
1351 InMemoryMetadataStore::new()
1352 .with_entity(entity())
1353 .with_entity(line_entity())
1354 .with_entity(product_entity()),
1355 )
1356 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1357 .with_repository_behavior_registry(
1358 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1359 );
1360 ctx.insert_resource(PostgresDialect);
1361 ctx.insert_resource(StubExecutor {
1362 affected: 1,
1363 rows: vec![
1364 Record::from([
1365 (String::from("id"), Value::U64(101)),
1366 (String::from("order_id"), Value::U64(11)),
1367 (String::from("name"), Value::Text(String::from("l1"))),
1368 ]),
1369 Record::from([
1370 (String::from("id"), Value::U64(102)),
1371 (String::from("order_id"), Value::U64(11)),
1372 (String::from("name"), Value::Text(String::from("l2"))),
1373 ]),
1374 Record::from([
1375 (String::from("id"), Value::U64(201)),
1376 (String::from("order_id"), Value::U64(12)),
1377 (String::from("name"), Value::Text(String::from("l3"))),
1378 ]),
1379 ],
1380 });
1381
1382 let repo = ctx
1383 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1384 .unwrap();
1385 let mut parents = vec![
1386 Record::from([(String::from("id"), Value::U64(11))]),
1387 Record::from([(String::from("id"), Value::U64(12))]),
1388 ];
1389
1390 repo.enhance_relations(&mut parents).unwrap();
1391
1392 match parents[0].get("lines") {
1393 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1394 other => panic!("unexpected lines payload: {other:?}"),
1395 }
1396 match parents[1].get("lines") {
1397 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1398 other => panic!("unexpected lines payload: {other:?}"),
1399 }
1400 }
1401
1402 #[test]
1403 fn resolved_repository_fetches_smart_list_of_entities() {
1404 let mut ctx = UserContext::new()
1405 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1406 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1407 ctx.insert_resource(PostgresDialect);
1408 ctx.insert_resource(StubExecutor {
1409 affected: 1,
1410 rows: vec![Record::from([
1411 (String::from("id"), Value::U64(7)),
1412 (String::from("version"), Value::I64(2)),
1413 (String::from("name"), Value::Text(String::from("typed"))),
1414 ])],
1415 });
1416
1417 let repo = ctx
1418 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1419 .unwrap();
1420 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1421
1422 assert_eq!(rows.len(), 1);
1423 assert_eq!(
1424 rows.first(),
1425 Some(&OrderEntity {
1426 id: 7,
1427 version: 2,
1428 name: String::from("typed"),
1429 })
1430 );
1431 }
1432
1433 #[test]
1434 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1435 let mut ctx = UserContext::new()
1436 .with_metadata(
1437 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1438 )
1439 .with_repository_registry(
1440 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1441 );
1442 ctx.insert_resource(PostgresDialect);
1443 ctx.insert_resource(StubExecutor {
1444 affected: 1,
1445 rows: vec![Record::from([
1446 (String::from("id"), Value::U64(9)),
1447 (String::from("name"), Value::Text(String::from("derived"))),
1448 ])],
1449 });
1450
1451 let repo = ctx
1452 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1453 .unwrap();
1454 let rows = repo
1455 .fetch_entities::<CatalogProductRow>(&repo.select())
1456 .unwrap();
1457
1458 assert_eq!(rows.len(), 1);
1459 assert_eq!(
1460 rows.first(),
1461 Some(&CatalogProductRow {
1462 id: 9,
1463 name: String::from("derived"),
1464 })
1465 );
1466 }
1467
1468 #[test]
1469 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1470 let mut ctx = UserContext::new()
1471 .with_metadata(
1472 InMemoryMetadataStore::new()
1473 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1474 )
1475 .with_repository_registry(
1476 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1477 );
1478 ctx.insert_resource(PostgresDialect);
1479 ctx.insert_resource(StubExecutor {
1480 affected: 1,
1481 rows: vec![Record::from([
1482 (String::from("id"), Value::U64(1)),
1483 (String::from("lineCount"), Value::I64(3)),
1484 (String::from("amount"), Value::F64(18.5)),
1485 ])],
1486 });
1487
1488 let repo = ctx
1489 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1490 .unwrap();
1491 let rows = repo
1492 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1493 .unwrap();
1494
1495 assert_eq!(rows.len(), 1);
1496 assert_eq!(rows.data[0].id, 1);
1497 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1498 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1499 assert_eq!(
1500 rows.into_vec().into_iter().next().unwrap().into_json(),
1501 serde_json::json!({
1502 "id": 1,
1503 "lineCount": 3,
1504 "amount": 18.5
1505 })
1506 );
1507 }
1508
1509 #[test]
1510 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1511 let executor = QueueExecutor {
1512 affected: 1,
1513 rows: Mutex::new(VecDeque::from([
1514 vec![
1515 Record::from([
1516 (String::from("id"), Value::U64(1)),
1517 (String::from("version"), Value::I64(1)),
1518 (String::from("name"), Value::Text(String::from("first"))),
1519 ]),
1520 Record::from([
1521 (String::from("id"), Value::U64(2)),
1522 (String::from("version"), Value::I64(1)),
1523 (String::from("name"), Value::Text(String::from("second"))),
1524 ]),
1525 ],
1526 vec![Record::from([
1527 (String::from("order_id"), Value::U64(1)),
1528 (String::from("lineCount"), Value::I64(3)),
1529 ])],
1530 ])),
1531 queries: Mutex::new(Vec::new()),
1532 };
1533 let mut ctx = UserContext::new()
1534 .with_metadata(
1535 InMemoryMetadataStore::new()
1536 .with_entity(entity())
1537 .with_entity(line_entity()),
1538 )
1539 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1540 ctx.insert_resource(PostgresDialect);
1541 ctx.insert_resource(executor);
1542
1543 let repo = ctx
1544 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1545 .unwrap();
1546 let rows = repo
1547 .fetch_all_with_relation_aggregates(
1548 &repo
1549 .select()
1550 .project("id")
1551 .project("version")
1552 .project("name"),
1553 &[RelationAggregate::new(
1554 "lines",
1555 "lineCount",
1556 SelectQuery::new("OrderLine"),
1557 true,
1558 )],
1559 )
1560 .unwrap();
1561
1562 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1563 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1564
1565 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1566 let queries = executor.queries.lock().unwrap();
1567 assert_eq!(queries.len(), 2);
1568 assert_eq!(
1569 queries[1],
1570 "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1571 );
1572 }
1573
1574 #[test]
1575 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1576 let executor = QueueExecutor {
1577 affected: 1,
1578 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1579 String::from("count"),
1580 Value::I64(2),
1581 )])]])),
1582 queries: Mutex::new(Vec::new()),
1583 };
1584 let mut ctx = UserContext::new()
1585 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1586 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1587 ctx.insert_resource(PostgresDialect);
1588 ctx.insert_resource(executor);
1589 ctx.insert_resource(InMemoryAggregationCache::default());
1590
1591 let repo = ctx
1592 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1593 .unwrap();
1594 let query = repo
1595 .select()
1596 .count("count")
1597 .enable_aggregation_cache_for(60_000);
1598
1599 let first = repo.fetch_all(&query).unwrap();
1600 let second = repo.fetch_all(&query).unwrap();
1601
1602 assert_eq!(first, second);
1603 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1604 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1605 }
1606
1607 #[test]
1608 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1609 let executor = QueueExecutor {
1610 affected: 1,
1611 rows: Mutex::new(VecDeque::from([
1612 vec![Record::from([(String::from("count"), Value::I64(2))])],
1613 vec![Record::from([(String::from("count"), Value::I64(3))])],
1614 ])),
1615 queries: Mutex::new(Vec::new()),
1616 };
1617 let mut ctx = UserContext::new()
1618 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1619 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1620 ctx.insert_resource(PostgresDialect);
1621 ctx.insert_resource(executor);
1622 ctx.insert_resource(
1623 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1624 as Arc<dyn AggregationCacheBackend>,
1625 );
1626
1627 let repo = ctx
1628 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1629 .unwrap();
1630 let query = repo
1631 .select()
1632 .count("count")
1633 .enable_aggregation_cache_for(60_000);
1634
1635 let first = repo.fetch_all(&query).unwrap();
1636 let cached = repo.fetch_all(&query).unwrap();
1637 repo.insert(
1638 &InsertCommand::new("Order")
1639 .value("id", 9_u64)
1640 .value("version", 1_i64)
1641 .value("name", "new"),
1642 )
1643 .unwrap();
1644 let refreshed = repo.fetch_all(&query).unwrap();
1645
1646 assert_eq!(first, cached);
1647 assert_ne!(cached, refreshed);
1648 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1649 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1650 }
1651
1652 #[test]
1653 fn aggregation_cache_propagates_to_relation_aggregates() {
1654 let parent_rows = vec![
1655 Record::from([
1656 (String::from("id"), Value::U64(1)),
1657 (String::from("version"), Value::I64(1)),
1658 (String::from("name"), Value::Text(String::from("first"))),
1659 ]),
1660 Record::from([
1661 (String::from("id"), Value::U64(2)),
1662 (String::from("version"), Value::I64(1)),
1663 (String::from("name"), Value::Text(String::from("second"))),
1664 ]),
1665 ];
1666 let aggregate_rows = vec![Record::from([
1667 (String::from("order_id"), Value::U64(1)),
1668 (String::from("lineCount"), Value::I64(3)),
1669 ])];
1670 let executor = QueueExecutor {
1671 affected: 1,
1672 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1673 queries: Mutex::new(Vec::new()),
1674 };
1675 let mut ctx = UserContext::new()
1676 .with_metadata(
1677 InMemoryMetadataStore::new()
1678 .with_entity(entity())
1679 .with_entity(line_entity()),
1680 )
1681 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1682 ctx.insert_resource(PostgresDialect);
1683 ctx.insert_resource(executor);
1684 ctx.insert_resource(InMemoryAggregationCache::default());
1685
1686 let repo = ctx
1687 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1688 .unwrap();
1689 let query = repo
1690 .select()
1691 .project("id")
1692 .project("version")
1693 .project("name")
1694 .enable_aggregation_cache_for(60_000)
1695 .propagate_aggregation_cache(60_000);
1696 let aggregate =
1697 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1698
1699 let first = repo
1700 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1701 .unwrap();
1702 let second = repo
1703 .fetch_all_with_relation_aggregates(&query, &[aggregate])
1704 .unwrap();
1705
1706 assert_eq!(first, second);
1707 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1708 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1709 }
1710
1711 #[test]
1712 fn memory_repository_fetches_smart_list_entities_with_query_features() {
1713 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1714 let repository = MemoryRepository::new(metadata).with_rows(
1715 "Order",
1716 vec![
1717 Record::from([
1718 (String::from("id"), Value::U64(1)),
1719 (String::from("version"), Value::I64(1)),
1720 (String::from("name"), Value::Text(String::from("alpha"))),
1721 ]),
1722 Record::from([
1723 (String::from("id"), Value::U64(2)),
1724 (String::from("version"), Value::I64(1)),
1725 (String::from("name"), Value::Text(String::from("beta"))),
1726 ]),
1727 Record::from([
1728 (String::from("id"), Value::U64(3)),
1729 (String::from("version"), Value::I64(1)),
1730 (String::from("name"), Value::Text(String::from("gamma"))),
1731 ]),
1732 ],
1733 );
1734
1735 let query = teaql_core::SelectQuery::new("Order")
1736 .filter(Expr::Binary {
1737 left: Box::new(Expr::column("id")),
1738 op: teaql_core::BinaryOp::Gte,
1739 right: Box::new(Expr::value(2_u64)),
1740 })
1741 .order_by(OrderBy::desc("id"))
1742 .limit(1);
1743
1744 let orders = repository.fetch_entities::<Order>(&query).unwrap();
1745
1746 assert_eq!(orders.ids(), vec![Value::U64(3)]);
1747 assert_eq!(orders.versions(), vec![1]);
1748 assert_eq!(orders.first().unwrap().name, "gamma");
1749 }
1750
1751 #[test]
1752 fn memory_repository_runs_aggregates() {
1753 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1754 let repository = MemoryRepository::new(metadata).with_rows(
1755 "Order",
1756 vec![
1757 Record::from([
1758 (String::from("id"), Value::U64(1)),
1759 (String::from("version"), Value::I64(1)),
1760 (String::from("name"), Value::Text(String::from("alpha"))),
1761 ]),
1762 Record::from([
1763 (String::from("id"), Value::U64(2)),
1764 (String::from("version"), Value::I64(2)),
1765 (String::from("name"), Value::Text(String::from("beta"))),
1766 ]),
1767 ],
1768 );
1769
1770 let query = teaql_core::SelectQuery {
1771 entity: String::from("Order"),
1772 projection: Vec::new(),
1773 expr_projection: Vec::new(),
1774 filter: None,
1775 having: None,
1776 order_by: Vec::new(),
1777 slice: None,
1778 aggregates: vec![
1779 Aggregate {
1780 function: AggregateFunction::Count,
1781 field: String::from("id"),
1782 alias: String::from("count"),
1783 },
1784 Aggregate {
1785 function: AggregateFunction::Sum,
1786 field: String::from("version"),
1787 alias: String::from("versionSum"),
1788 },
1789 ],
1790 group_by: Vec::new(),
1791 relations: Vec::new(),
1792 aggregation_cache: None,
1793 comment: None,
1794 raw_sql: None,
1795 raw_sql_search_criteria: Vec::new(),
1796 json_expr: None,
1797 dynamic_properties: Vec::new(),
1798 raw_projections: Vec::new(),
1799 object_group_bys: Vec::new(),
1800 child_enhancements: Vec::new(),
1801 };
1802
1803 let rows = repository.fetch_all(&query).unwrap();
1804
1805 assert_eq!(rows.len(), 1);
1806 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1807 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1808 }
1809
1810 #[test]
1811 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1812 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1813 let repository = MemoryRepository::new(metadata).with_rows(
1814 "Order",
1815 vec![
1816 Record::from([
1817 (String::from("id"), Value::U64(1)),
1818 (String::from("version"), Value::I64(1)),
1819 (String::from("name"), Value::Text(String::from("alpha"))),
1820 ]),
1821 Record::from([
1822 (String::from("id"), Value::U64(2)),
1823 (String::from("version"), Value::I64(2)),
1824 (String::from("name"), Value::Text(String::from("alpha"))),
1825 ]),
1826 Record::from([
1827 (String::from("id"), Value::U64(3)),
1828 (String::from("version"), Value::I64(3)),
1829 (String::from("name"), Value::Text(String::from("tmp-beta"))),
1830 ]),
1831 ],
1832 );
1833
1834 let rows = repository
1835 .fetch_all(
1836 &teaql_core::SelectQuery::new("Order")
1837 .filter(
1838 Expr::between("version", 1_i64, 3_i64)
1839 .and_expr(Expr::not_like("name", "tmp%"))
1840 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1841 )
1842 .group_by("name")
1843 .count("total")
1844 .sum("version", "versionSum"),
1845 )
1846 .unwrap();
1847
1848 assert_eq!(rows.len(), 1);
1849 assert_eq!(
1850 rows[0].get("name"),
1851 Some(&Value::Text(String::from("alpha")))
1852 );
1853 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1854 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1855 }
1856
1857 #[test]
1858 fn memory_repository_runs_extended_aggregates_and_having() {
1859 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1860 let repository = MemoryRepository::new(metadata).with_rows(
1861 "Order",
1862 vec![
1863 Record::from([
1864 (String::from("id"), Value::U64(1)),
1865 (String::from("version"), Value::I64(1)),
1866 (String::from("name"), Value::Text(String::from("alpha"))),
1867 ]),
1868 Record::from([
1869 (String::from("id"), Value::U64(2)),
1870 (String::from("version"), Value::I64(3)),
1871 (String::from("name"), Value::Text(String::from("alpha"))),
1872 ]),
1873 Record::from([
1874 (String::from("id"), Value::U64(3)),
1875 (String::from("version"), Value::I64(7)),
1876 (String::from("name"), Value::Text(String::from("beta"))),
1877 ]),
1878 ],
1879 );
1880
1881 let rows = repository
1882 .fetch_all(
1883 &teaql_core::SelectQuery::new("Order")
1884 .group_by("name")
1885 .count("total")
1886 .stddev("version", "stddevVersion")
1887 .var_pop("version", "varPopVersion")
1888 .bit_or("version", "bitOrVersion")
1889 .having(Expr::gt("total", 1_i64)),
1890 )
1891 .unwrap();
1892
1893 assert_eq!(rows.len(), 1);
1894 assert_eq!(
1895 rows[0].get("name"),
1896 Some(&Value::Text(String::from("alpha")))
1897 );
1898 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1899 assert_eq!(
1900 rows[0].get("stddevVersion").map(Value::to_json_value),
1901 Some(serde_json::Value::String(
1902 "1.4142135623730951454746218583".to_owned()
1903 ))
1904 );
1905 assert_eq!(
1906 rows[0].get("varPopVersion"),
1907 Some(&Value::Decimal(Decimal::ONE))
1908 );
1909 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1910 }
1911
1912 #[test]
1913 fn memory_repository_runs_sound_like_filter() {
1914 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1915 let repository = MemoryRepository::new(metadata).with_rows(
1916 "Order",
1917 vec![
1918 Record::from([
1919 (String::from("id"), Value::U64(1)),
1920 (String::from("version"), Value::I64(1)),
1921 (String::from("name"), Value::Text(String::from("Robert"))),
1922 ]),
1923 Record::from([
1924 (String::from("id"), Value::U64(2)),
1925 (String::from("version"), Value::I64(1)),
1926 (String::from("name"), Value::Text(String::from("Rupert"))),
1927 ]),
1928 Record::from([
1929 (String::from("id"), Value::U64(3)),
1930 (String::from("version"), Value::I64(1)),
1931 (String::from("name"), Value::Text(String::from("Ashcraft"))),
1932 ]),
1933 ],
1934 );
1935
1936 let rows = repository
1937 .fetch_all(
1938 &teaql_core::SelectQuery::new("Order")
1939 .filter(Expr::sound_like("name", "Robert"))
1940 .order_asc("id"),
1941 )
1942 .unwrap();
1943
1944 assert_eq!(rows.len(), 2);
1945 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1946 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1947 }
1948
1949 #[test]
1950 fn memory_repository_runs_java_style_string_match_filters() {
1951 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1952 let repository = MemoryRepository::new(metadata).with_rows(
1953 "Order",
1954 vec![
1955 Record::from([
1956 (String::from("id"), Value::U64(1)),
1957 (String::from("version"), Value::I64(1)),
1958 (String::from("name"), Value::Text(String::from("tea-order"))),
1959 ]),
1960 Record::from([
1961 (String::from("id"), Value::U64(2)),
1962 (String::from("version"), Value::I64(1)),
1963 (
1964 String::from("name"),
1965 Value::Text(String::from("coffee-order")),
1966 ),
1967 ]),
1968 Record::from([
1969 (String::from("id"), Value::U64(3)),
1970 (String::from("version"), Value::I64(1)),
1971 (
1972 String::from("name"),
1973 Value::Text(String::from("tea-archived")),
1974 ),
1975 ]),
1976 ],
1977 );
1978
1979 let rows = repository
1980 .fetch_all(
1981 &teaql_core::SelectQuery::new("Order")
1982 .filter(
1983 Expr::contain("name", "tea")
1984 .and_expr(Expr::begin_with("name", "tea"))
1985 .and_expr(Expr::end_with("name", "order"))
1986 .and_expr(Expr::not_contain("name", "coffee"))
1987 .and_expr(Expr::not_begin_with("name", "archived"))
1988 .and_expr(Expr::not_end_with("name", "draft")),
1989 )
1990 .order_asc("id"),
1991 )
1992 .unwrap();
1993
1994 assert_eq!(rows.len(), 1);
1995 assert_eq!(
1996 rows[0].get("name"),
1997 Some(&Value::Text("tea-order".to_owned()))
1998 );
1999 }
2000
2001 #[test]
2002 fn memory_repository_runs_property_to_property_filters() {
2003 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2004 let repository = MemoryRepository::new(metadata).with_rows(
2005 "Order",
2006 vec![
2007 Record::from([
2008 (String::from("id"), Value::U64(1)),
2009 (String::from("version"), Value::I64(2)),
2010 (String::from("name"), Value::Text(String::from("keep"))),
2011 ]),
2012 Record::from([
2013 (String::from("id"), Value::U64(2)),
2014 (String::from("version"), Value::I64(1)),
2015 (String::from("name"), Value::Text(String::from("skip"))),
2016 ]),
2017 ],
2018 );
2019
2020 let rows = repository
2021 .fetch_all(
2022 &teaql_core::SelectQuery::new("Order")
2023 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2024 .order_asc("id"),
2025 )
2026 .unwrap();
2027
2028 assert_eq!(rows.len(), 1);
2029 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2030 }
2031
2032 #[test]
2033 fn memory_repository_supports_mutations_and_optimistic_locking() {
2034 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2035 let repository = MemoryRepository::new(metadata);
2036
2037 repository
2038 .insert(
2039 &InsertCommand::new("Order")
2040 .value("id", 10_u64)
2041 .value("version", 1_i64)
2042 .value("name", "draft"),
2043 )
2044 .unwrap();
2045 repository
2046 .update(
2047 &UpdateCommand::new("Order", 10_u64)
2048 .expected_version(1)
2049 .value("name", "submitted"),
2050 )
2051 .unwrap();
2052
2053 let row = repository
2054 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2055 .unwrap()
2056 .pop()
2057 .unwrap();
2058 assert_eq!(
2059 row.get("name"),
2060 Some(&Value::Text(String::from("submitted")))
2061 );
2062 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2063
2064 let conflict = repository
2065 .update(
2066 &UpdateCommand::new("Order", 10_u64)
2067 .expected_version(1)
2068 .value("name", "stale"),
2069 )
2070 .unwrap_err();
2071 assert!(matches!(
2072 conflict,
2073 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2074 ));
2075
2076 repository
2077 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2078 .unwrap();
2079 let row = repository
2080 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2081 .unwrap()
2082 .pop()
2083 .unwrap();
2084 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2085
2086 repository
2087 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2088 .unwrap();
2089 let row = repository
2090 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2091 .unwrap()
2092 .pop()
2093 .unwrap();
2094 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2095 }
2096}
2097
2098#[cfg(all(test, feature = "sqlx"))]
2099mod sqlx_integration_tests {
2100 use super::sqlx_support::{
2101 MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2102 SqliteIdSpaceGenerator, SqliteMutationExecutor,
2103 };
2104 use super::{
2105 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2106 InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2107 RepositoryBehavior, UserContext,
2108 };
2109 use chrono::{NaiveDate, TimeZone, Utc};
2110 use teaql_core::{
2111 DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2112 PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2113 };
2114 use teaql_dialect_pg::PostgresDialect;
2115 use teaql_dialect_sqlite::SqliteDialect;
2116 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2117 use teaql_sql::SqlDialect;
2118 use tokio::runtime::Handle;
2119 use tokio::task::block_in_place;
2120
2121 fn entity() -> EntityDescriptor {
2122 EntityDescriptor::new("Order")
2123 .table_name("orders")
2124 .property(
2125 PropertyDescriptor::new("id", DataType::U64)
2126 .column_name("id")
2127 .id()
2128 .not_null(),
2129 )
2130 .property(
2131 PropertyDescriptor::new("version", DataType::I64)
2132 .column_name("version")
2133 .version()
2134 .not_null(),
2135 )
2136 .property(
2137 PropertyDescriptor::new("name", DataType::Text)
2138 .column_name("name")
2139 .not_null(),
2140 )
2141 .relation(
2142 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2143 .local_key("id")
2144 .foreign_key("order_id")
2145 .many(),
2146 )
2147 }
2148
2149 fn sqlite_entity_keep_missing() -> EntityDescriptor {
2150 EntityDescriptor::new("Order")
2151 .table_name("orders")
2152 .property(
2153 PropertyDescriptor::new("id", DataType::U64)
2154 .column_name("id")
2155 .id()
2156 .not_null(),
2157 )
2158 .property(
2159 PropertyDescriptor::new("version", DataType::I64)
2160 .column_name("version")
2161 .version()
2162 .not_null(),
2163 )
2164 .property(
2165 PropertyDescriptor::new("name", DataType::Text)
2166 .column_name("name")
2167 .not_null(),
2168 )
2169 .relation(
2170 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2171 .local_key("id")
2172 .foreign_key("order_id")
2173 .many()
2174 .keep_missing(),
2175 )
2176 }
2177
2178 fn line_entity() -> EntityDescriptor {
2179 EntityDescriptor::new("OrderLine")
2180 .table_name("orderline")
2181 .property(
2182 PropertyDescriptor::new("id", DataType::U64)
2183 .column_name("id")
2184 .id()
2185 .not_null(),
2186 )
2187 .property(
2188 PropertyDescriptor::new("version", DataType::I64)
2189 .column_name("version")
2190 .version(),
2191 )
2192 .property(
2193 PropertyDescriptor::new("order_id", DataType::U64)
2194 .column_name("order_id")
2195 .not_null(),
2196 )
2197 .property(
2198 PropertyDescriptor::new("name", DataType::Text)
2199 .column_name("name")
2200 .not_null(),
2201 )
2202 .property(
2203 PropertyDescriptor::new("product_id", DataType::U64)
2204 .column_name("product_id")
2205 .not_null(),
2206 )
2207 .relation(
2208 teaql_core::RelationDescriptor::new("product", "Product")
2209 .local_key("product_id")
2210 .foreign_key("id"),
2211 )
2212 }
2213
2214 fn product_entity() -> EntityDescriptor {
2215 EntityDescriptor::new("Product")
2216 .table_name("product")
2217 .property(
2218 PropertyDescriptor::new("id", DataType::U64)
2219 .column_name("id")
2220 .id()
2221 .not_null(),
2222 )
2223 .property(
2224 PropertyDescriptor::new("name", DataType::Text)
2225 .column_name("name")
2226 .not_null(),
2227 )
2228 }
2229
2230 fn typed_entity() -> EntityDescriptor {
2231 EntityDescriptor::new("TypedValue")
2232 .table_name("typed_value")
2233 .property(
2234 PropertyDescriptor::new("id", DataType::U64)
2235 .column_name("id")
2236 .id()
2237 .not_null(),
2238 )
2239 .property(
2240 PropertyDescriptor::new("payload", DataType::Json)
2241 .column_name("payload")
2242 .not_null(),
2243 )
2244 .property(
2245 PropertyDescriptor::new("amount", DataType::Decimal)
2246 .column_name("amount")
2247 .not_null(),
2248 )
2249 .property(
2250 PropertyDescriptor::new("birthday", DataType::Date)
2251 .column_name("birthday")
2252 .not_null(),
2253 )
2254 .property(
2255 PropertyDescriptor::new("happened_at", DataType::Timestamp)
2256 .column_name("happened_at")
2257 .not_null(),
2258 )
2259 }
2260
2261 struct OrderBehavior;
2262 struct NestedOrderBehavior;
2263
2264 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2265 #[teaql(entity = "Product", table = "product")]
2266 struct ProductEntityRow {
2267 #[teaql(id)]
2268 id: u64,
2269 name: String,
2270 }
2271
2272 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2273 #[teaql(entity = "OrderLine", table = "orderline")]
2274 struct OrderLineEntityRow {
2275 #[teaql(id)]
2276 id: u64,
2277 #[teaql(column = "order_id")]
2278 order_id: u64,
2279 name: String,
2280 #[teaql(column = "product_id")]
2281 product_id: u64,
2282 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2283 product: Option<ProductEntityRow>,
2284 }
2285
2286 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2287 #[teaql(entity = "Order", table = "orders")]
2288 struct OrderAggregateRow {
2289 #[teaql(id)]
2290 id: u64,
2291 #[teaql(version)]
2292 version: i64,
2293 name: String,
2294 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2295 lines: teaql_core::SmartList<OrderLineEntityRow>,
2296 }
2297
2298 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2299 #[teaql(entity = "Order", table = "orders")]
2300 struct Order {
2301 #[teaql(id)]
2302 id: u64,
2303 #[teaql(version)]
2304 version: i64,
2305 name: String,
2306 }
2307
2308 impl RepositoryBehavior for OrderBehavior {
2309 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2310 vec!["lines".to_owned()]
2311 }
2312 }
2313
2314 impl RepositoryBehavior for NestedOrderBehavior {
2315 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2316 vec!["lines.product".to_owned()]
2317 }
2318 }
2319
2320 #[derive(Clone)]
2321 struct SqliteSyncExecutor {
2322 inner: SqliteMutationExecutor,
2323 }
2324
2325 impl SqliteSyncExecutor {
2326 fn new(inner: SqliteMutationExecutor) -> Self {
2327 Self { inner }
2328 }
2329 }
2330
2331 impl QueryExecutor for SqliteSyncExecutor {
2332 type Error = MutationExecutorError;
2333
2334 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2335 let handle = Handle::current();
2336 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2337 }
2338
2339 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2340 let handle = Handle::current();
2341 block_in_place(|| handle.block_on(self.inner.execute(query)))
2342 }
2343
2344 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2345 let handle = Handle::current();
2346 block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2347 Ok(GraphTransactionBoundary::Started)
2348 }
2349
2350 fn commit_transaction(&self) -> Result<(), Self::Error> {
2351 let handle = Handle::current();
2352 block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2353 }
2354
2355 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2356 let handle = Handle::current();
2357 block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2358 }
2359 }
2360
2361 #[derive(Clone)]
2362 struct PgSyncExecutor {
2363 inner: PgMutationExecutor,
2364 }
2365
2366 impl PgSyncExecutor {
2367 fn new(inner: PgMutationExecutor) -> Self {
2368 Self { inner }
2369 }
2370 }
2371
2372 impl QueryExecutor for PgSyncExecutor {
2373 type Error = MutationExecutorError;
2374
2375 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2376 let handle = Handle::current();
2377 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2378 }
2379
2380 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2381 let handle = Handle::current();
2382 block_in_place(|| handle.block_on(self.inner.execute(query)))
2383 }
2384 }
2385
2386 #[derive(Clone)]
2387 struct PgTxSyncExecutor {
2388 inner: PgTransactionExecutor,
2389 }
2390
2391 impl PgTxSyncExecutor {
2392 fn new(inner: PgTransactionExecutor) -> Self {
2393 Self { inner }
2394 }
2395 }
2396
2397 impl QueryExecutor for PgTxSyncExecutor {
2398 type Error = MutationExecutorError;
2399
2400 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2401 let handle = Handle::current();
2402 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2403 }
2404
2405 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2406 let handle = Handle::current();
2407 block_in_place(|| handle.block_on(self.inner.execute(query)))
2408 }
2409
2410 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2411 Ok(GraphTransactionBoundary::AlreadyActive)
2412 }
2413
2414 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2415 let handle = Handle::current();
2416 block_in_place(|| handle.block_on(self.inner.rollback()))
2417 }
2418 }
2419
2420 #[tokio::test]
2421 async fn sqlite_executor_runs_crud_flow() {
2422 use sqlx::sqlite::SqlitePoolOptions;
2423
2424 let pool = SqlitePoolOptions::new()
2425 .max_connections(1)
2426 .connect("sqlite::memory:")
2427 .await
2428 .unwrap();
2429
2430 let executor = SqliteMutationExecutor::new(pool.clone());
2431 let dialect = SqliteDialect;
2432 let entity = entity();
2433 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2434
2435 let insert = dialect
2436 .compile_insert(
2437 &entity,
2438 &InsertCommand::new("Order")
2439 .value("id", 1_u64)
2440 .value("version", 1_i64)
2441 .value("name", "first"),
2442 )
2443 .unwrap();
2444 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2445
2446 let select = dialect
2447 .compile_select(
2448 &entity,
2449 &SelectQuery::new("Order")
2450 .project("id")
2451 .project("version")
2452 .project("name")
2453 .filter(Expr::eq("id", 1_u64)),
2454 )
2455 .unwrap();
2456 let rows = executor.fetch_all(&select).await.unwrap();
2457 assert_eq!(rows.len(), 1);
2458 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2459 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2460 assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2461
2462 let update = dialect
2463 .compile_update(
2464 &entity,
2465 &UpdateCommand::new("Order", 1_u64)
2466 .expected_version(1)
2467 .value("name", "second"),
2468 )
2469 .unwrap();
2470 assert_eq!(executor.execute(&update).await.unwrap(), 1);
2471
2472 let after_update = executor.fetch_all(&select).await.unwrap();
2473 assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2474 assert_eq!(
2475 after_update[0].get("name"),
2476 Some(&Value::Text("second".to_owned()))
2477 );
2478
2479 let delete = dialect
2480 .compile_delete(
2481 &entity,
2482 &DeleteCommand::new("Order", 1_u64).expected_version(2),
2483 )
2484 .unwrap();
2485 assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2486
2487 let after_delete = executor.fetch_all(&select).await.unwrap();
2488 assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2489
2490 let recover = dialect
2491 .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2492 .unwrap();
2493 assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2494
2495 let after_recover = executor.fetch_all(&select).await.unwrap();
2496 assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2497 }
2498
2499 #[tokio::test(flavor = "multi_thread")]
2500 async fn sqlite_executor_enhances_relations() {
2501 use sqlx::sqlite::SqlitePoolOptions;
2502
2503 let pool = SqlitePoolOptions::new()
2504 .max_connections(1)
2505 .connect("sqlite::memory:")
2506 .await
2507 .unwrap();
2508
2509 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2510 let order = entity();
2511 let line = line_entity();
2512 mutation_executor
2513 .ensure_schema(&SqliteDialect, &[&order, &line])
2514 .await
2515 .unwrap();
2516
2517 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2518 .execute(&pool)
2519 .await
2520 .unwrap();
2521 sqlx::query(
2522 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2523 (101, 1, 1001, 'l1'),
2524 (102, 1, 1002, 'l2'),
2525 (201, 2, 1003, 'l3')",
2526 )
2527 .execute(&pool)
2528 .await
2529 .unwrap();
2530
2531 let executor = SqliteSyncExecutor::new(mutation_executor);
2532 let mut ctx = UserContext::new()
2533 .with_metadata(
2534 InMemoryMetadataStore::new()
2535 .with_entity(order)
2536 .with_entity(line)
2537 .with_entity(product_entity()),
2538 )
2539 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2540 .with_repository_behavior_registry(
2541 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2542 );
2543 ctx.insert_resource(SqliteDialect);
2544 ctx.insert_resource(executor);
2545
2546 let repo = ctx
2547 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2548 .unwrap();
2549 let mut parents = repo
2550 .fetch_all(
2551 &repo
2552 .select()
2553 .project("id")
2554 .project("version")
2555 .project("name")
2556 .order_by(teaql_core::OrderBy::asc("id")),
2557 )
2558 .unwrap();
2559
2560 repo.enhance_relations(&mut parents).unwrap();
2561
2562 assert_eq!(parents.len(), 2);
2563 match parents[0].get("lines") {
2564 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2565 other => panic!("unexpected first lines payload: {other:?}"),
2566 }
2567 match parents[1].get("lines") {
2568 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2569 other => panic!("unexpected second lines payload: {other:?}"),
2570 }
2571 }
2572
2573 #[tokio::test(flavor = "multi_thread")]
2574 async fn sqlite_executor_enhances_nested_relations() {
2575 use sqlx::sqlite::SqlitePoolOptions;
2576
2577 let pool = SqlitePoolOptions::new()
2578 .max_connections(1)
2579 .connect("sqlite::memory:")
2580 .await
2581 .unwrap();
2582
2583 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2584 let order = entity();
2585 let line = line_entity();
2586 let product = product_entity();
2587 mutation_executor
2588 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2589 .await
2590 .unwrap();
2591
2592 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2593 .execute(&pool)
2594 .await
2595 .unwrap();
2596 sqlx::query(
2597 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2598 (101, 1, 1001, 'l1'),
2599 (102, 1, 1002, 'l2')",
2600 )
2601 .execute(&pool)
2602 .await
2603 .unwrap();
2604 sqlx::query(
2605 "INSERT INTO product (id, name) VALUES
2606 (1001, 'p1'),
2607 (1002, 'p2')",
2608 )
2609 .execute(&pool)
2610 .await
2611 .unwrap();
2612
2613 let executor = SqliteSyncExecutor::new(mutation_executor);
2614 let mut ctx = UserContext::new()
2615 .with_metadata(
2616 InMemoryMetadataStore::new()
2617 .with_entity(order)
2618 .with_entity(line)
2619 .with_entity(product),
2620 )
2621 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2622 .with_repository_behavior_registry(
2623 InMemoryRepositoryBehaviorRegistry::new()
2624 .with_behavior("Order", NestedOrderBehavior),
2625 );
2626 ctx.insert_resource(SqliteDialect);
2627 ctx.insert_resource(executor);
2628
2629 let repo = ctx
2630 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2631 .unwrap();
2632 let mut parents = repo
2633 .fetch_all(
2634 &repo
2635 .select()
2636 .project("id")
2637 .project("version")
2638 .project("name"),
2639 )
2640 .unwrap();
2641
2642 repo.enhance_relations(&mut parents).unwrap();
2643
2644 match parents[0].get("lines") {
2645 Some(Value::List(lines)) => {
2646 assert_eq!(lines.len(), 2);
2647 for line in lines {
2648 match line {
2649 Value::Object(line_record) => match line_record.get("product") {
2650 Some(Value::Object(product)) => {
2651 assert!(product.get("name").is_some());
2652 }
2653 other => panic!("unexpected product payload: {other:?}"),
2654 },
2655 other => panic!("unexpected line payload: {other:?}"),
2656 }
2657 }
2658 }
2659 other => panic!("unexpected nested lines payload: {other:?}"),
2660 }
2661 }
2662
2663 #[tokio::test(flavor = "multi_thread")]
2664 async fn sqlite_executor_enhances_query_relation_with_child_query() {
2665 use sqlx::sqlite::SqlitePoolOptions;
2666
2667 let pool = SqlitePoolOptions::new()
2668 .max_connections(1)
2669 .connect("sqlite::memory:")
2670 .await
2671 .unwrap();
2672
2673 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2674 let order = entity();
2675 let line = line_entity();
2676 let product = product_entity();
2677 mutation_executor
2678 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2679 .await
2680 .unwrap();
2681
2682 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2683 .execute(&pool)
2684 .await
2685 .unwrap();
2686 sqlx::query(
2687 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2688 (101, 1, 1001, 'keep'),
2689 (102, 1, 1002, 'drop')",
2690 )
2691 .execute(&pool)
2692 .await
2693 .unwrap();
2694 sqlx::query(
2695 "INSERT INTO product (id, name) VALUES
2696 (1001, 'p1'),
2697 (1002, 'p2')",
2698 )
2699 .execute(&pool)
2700 .await
2701 .unwrap();
2702
2703 let executor = SqliteSyncExecutor::new(mutation_executor);
2704 let mut ctx = UserContext::new()
2705 .with_metadata(
2706 InMemoryMetadataStore::new()
2707 .with_entity(order)
2708 .with_entity(line)
2709 .with_entity(product),
2710 )
2711 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2712 ctx.insert_resource(SqliteDialect);
2713 ctx.insert_resource(executor);
2714
2715 let repo = ctx
2716 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2717 .unwrap();
2718 let query = repo.select().relation_query(
2719 "lines",
2720 SelectQuery::new("OrderLine")
2721 .project("name")
2722 .filter(Expr::eq("name", "keep"))
2723 .order_desc("id")
2724 .page(0, 10)
2725 .relation_query("product", SelectQuery::new("Product").project("name")),
2726 );
2727 let mut parents = repo
2728 .fetch_all(
2729 &repo
2730 .select()
2731 .project("id")
2732 .project("version")
2733 .project("name"),
2734 )
2735 .unwrap();
2736
2737 repo.enhance_query_relations(&mut parents, &query).unwrap();
2738
2739 match parents[0].get("lines") {
2740 Some(Value::List(lines)) => {
2741 assert_eq!(lines.len(), 1);
2742 let Value::Object(line) = &lines[0] else {
2743 panic!("unexpected line payload: {:?}", lines[0]);
2744 };
2745 assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2746 assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2747 assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2748 match line.get("product") {
2749 Some(Value::Object(product)) => {
2750 assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2751 assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2752 }
2753 other => panic!("unexpected product payload: {other:?}"),
2754 }
2755 }
2756 other => panic!("unexpected query relation payload: {other:?}"),
2757 }
2758 }
2759
2760 #[tokio::test]
2761 async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2762 use sqlx::Row;
2763 use sqlx::sqlite::SqlitePoolOptions;
2764
2765 let pool = SqlitePoolOptions::new()
2766 .max_connections(1)
2767 .connect("sqlite::memory:")
2768 .await
2769 .unwrap();
2770
2771 sqlx::query(
2772 "CREATE TABLE orders (
2773 id INTEGER PRIMARY KEY
2774 )",
2775 )
2776 .execute(&pool)
2777 .await
2778 .unwrap();
2779
2780 let executor = SqliteMutationExecutor::new(pool.clone());
2781 let dialect = SqliteDialect;
2782 let entity = entity();
2783
2784 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2785
2786 let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2787 .fetch_all(&pool)
2788 .await
2789 .unwrap()
2790 .into_iter()
2791 .map(|row| row.try_get::<String, _>("name").unwrap())
2792 .collect::<Vec<_>>();
2793
2794 assert!(columns.contains(&"id".to_owned()));
2795 assert!(columns.contains(&"version".to_owned()));
2796 assert!(columns.contains(&"name".to_owned()));
2797 }
2798
2799 #[tokio::test]
2800 async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2801 use sqlx::Row;
2802 use sqlx::sqlite::SqlitePoolOptions;
2803
2804 let pool = SqlitePoolOptions::new()
2805 .max_connections(1)
2806 .connect("sqlite::memory:")
2807 .await
2808 .unwrap();
2809
2810 let module = super::RuntimeModule::new()
2811 .descriptor(entity())
2812 .descriptor(line_entity())
2813 .descriptor(product_entity());
2814 let mut ctx = super::UserContext::new().with_module(module);
2815 ctx.insert_resource(SqliteDialect);
2816 ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2817
2818 ctx.ensure_sqlite_schema().await.unwrap();
2819
2820 let tables = sqlx::query(
2821 "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2822 )
2823 .fetch_all(&pool)
2824 .await
2825 .unwrap()
2826 .into_iter()
2827 .map(|row| row.try_get::<String, _>("name").unwrap())
2828 .collect::<Vec<_>>();
2829
2830 assert_eq!(
2831 tables,
2832 vec![
2833 "orderline".to_owned(),
2834 "orders".to_owned(),
2835 "product".to_owned()
2836 ]
2837 );
2838 }
2839
2840 #[tokio::test]
2841 async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2842 use sqlx::sqlite::SqlitePoolOptions;
2843
2844 let pool = SqlitePoolOptions::new()
2845 .max_connections(1)
2846 .connect("sqlite::memory:")
2847 .await
2848 .unwrap();
2849
2850 let executor = SqliteMutationExecutor::new(pool.clone());
2851 let dialect = SqliteDialect;
2852 let entity = typed_entity();
2853 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2854
2855 let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2856 let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2857 let payload = serde_json::json!({"name": "teaql", "count": 2});
2858 let amount = Decimal::new(12345, 2);
2859
2860 let insert = dialect
2861 .compile_insert(
2862 &entity,
2863 &InsertCommand::new("TypedValue")
2864 .value("id", 1_u64)
2865 .value("payload", payload.clone())
2866 .value("amount", amount)
2867 .value("birthday", birthday)
2868 .value("happened_at", happened_at),
2869 )
2870 .unwrap();
2871 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2872
2873 let select = dialect
2874 .compile_select(
2875 &entity,
2876 &SelectQuery::new("TypedValue")
2877 .project("id")
2878 .project("payload")
2879 .project("amount")
2880 .project("birthday")
2881 .project("happened_at")
2882 .filter(Expr::eq("id", 1_u64)),
2883 )
2884 .unwrap();
2885 let rows = executor.fetch_all(&select).await.unwrap();
2886 assert_eq!(rows.len(), 1);
2887 assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2888 assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2889 assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2890 assert_eq!(
2891 rows[0].get("happened_at"),
2892 Some(&Value::Timestamp(happened_at))
2893 );
2894 }
2895
2896 #[tokio::test(flavor = "multi_thread")]
2897 async fn sqlite_fetches_enhanced_typed_entities() {
2898 use sqlx::sqlite::SqlitePoolOptions;
2899
2900 let pool = SqlitePoolOptions::new()
2901 .max_connections(1)
2902 .connect("sqlite::memory:")
2903 .await
2904 .unwrap();
2905
2906 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2907 let order = entity();
2908 let line = line_entity();
2909 let product = product_entity();
2910 mutation_executor
2911 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2912 .await
2913 .unwrap();
2914
2915 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2916 .execute(&pool)
2917 .await
2918 .unwrap();
2919 sqlx::query(
2920 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2921 (101, 1, 1001, 'l1'),
2922 (102, 1, 1002, 'l2')",
2923 )
2924 .execute(&pool)
2925 .await
2926 .unwrap();
2927 sqlx::query(
2928 "INSERT INTO product (id, name) VALUES
2929 (1001, 'p1'),
2930 (1002, 'p2')",
2931 )
2932 .execute(&pool)
2933 .await
2934 .unwrap();
2935
2936 let executor = SqliteSyncExecutor::new(mutation_executor);
2937 let mut ctx = UserContext::new()
2938 .with_metadata(
2939 InMemoryMetadataStore::new()
2940 .with_entity(order)
2941 .with_entity(line)
2942 .with_entity(product),
2943 )
2944 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2945 .with_repository_behavior_registry(
2946 InMemoryRepositoryBehaviorRegistry::new()
2947 .with_behavior("Order", NestedOrderBehavior),
2948 );
2949 ctx.insert_resource(SqliteDialect);
2950 ctx.insert_resource(executor);
2951
2952 let repo = ctx
2953 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2954 .unwrap();
2955 let rows = repo
2956 .fetch_enhanced_entities::<OrderAggregateRow>(
2957 &repo
2958 .select()
2959 .project("id")
2960 .project("version")
2961 .project("name"),
2962 )
2963 .unwrap();
2964
2965 assert_eq!(rows.len(), 1);
2966 let order = rows.first().unwrap();
2967 assert_eq!(order.id, 1);
2968 assert_eq!(order.lines.len(), 2);
2969 assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2970 assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2971 }
2972
2973 #[tokio::test(flavor = "multi_thread")]
2974 async fn sqlite_fetches_typed_smart_list_entities() {
2975 use sqlx::sqlite::SqlitePoolOptions;
2976
2977 let pool = SqlitePoolOptions::new()
2978 .max_connections(1)
2979 .connect("sqlite::memory:")
2980 .await
2981 .unwrap();
2982
2983 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2984 let order = entity();
2985 mutation_executor
2986 .ensure_schema(&SqliteDialect, &[&order])
2987 .await
2988 .unwrap();
2989
2990 sqlx::query(
2991 "INSERT INTO orders (id, version, name) VALUES
2992 (1, 1, 'o1'),
2993 (2, 3, 'o2')",
2994 )
2995 .execute(&pool)
2996 .await
2997 .unwrap();
2998
2999 let executor = SqliteSyncExecutor::new(mutation_executor);
3000 let mut ctx = UserContext::new()
3001 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3002 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3003 ctx.insert_resource(SqliteDialect);
3004 ctx.insert_resource(executor);
3005
3006 let repo = ctx
3007 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3008 .unwrap();
3009 let rows = repo
3010 .fetch_entities::<OrderAggregateRow>(
3011 &repo
3012 .select()
3013 .project("id")
3014 .project("version")
3015 .project("name"),
3016 )
3017 .unwrap();
3018
3019 assert_eq!(rows.len(), 2);
3020 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3021 assert_eq!(rows.versions(), vec![1, 3]);
3022 assert!(rows.data[0].lines.is_empty());
3023 assert!(rows.data[1].lines.is_empty());
3024 }
3025
3026 #[tokio::test(flavor = "multi_thread")]
3027 async fn sqlite_fetches_smart_list_of_order_entities() {
3028 use sqlx::sqlite::SqlitePoolOptions;
3029
3030 let pool = SqlitePoolOptions::new()
3031 .max_connections(1)
3032 .connect("sqlite::memory:")
3033 .await
3034 .unwrap();
3035
3036 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3037 let order = entity();
3038 mutation_executor
3039 .ensure_schema(&SqliteDialect, &[&order])
3040 .await
3041 .unwrap();
3042
3043 sqlx::query(
3044 "INSERT INTO orders (id, version, name) VALUES
3045 (1, 1, 'o1'),
3046 (2, 3, 'o2')",
3047 )
3048 .execute(&pool)
3049 .await
3050 .unwrap();
3051
3052 let executor = SqliteSyncExecutor::new(mutation_executor);
3053 let mut ctx = UserContext::new()
3054 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3055 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3056 ctx.insert_resource(SqliteDialect);
3057 ctx.insert_resource(executor);
3058
3059 let repo = ctx
3060 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3061 .unwrap();
3062 let rows = repo
3063 .fetch_entities::<Order>(
3064 &repo
3065 .select()
3066 .project("id")
3067 .project("version")
3068 .project("name"),
3069 )
3070 .unwrap();
3071
3072 assert_eq!(
3073 rows.data,
3074 vec![
3075 Order {
3076 id: 1,
3077 version: 1,
3078 name: "o1".to_owned(),
3079 },
3080 Order {
3081 id: 2,
3082 version: 3,
3083 name: "o2".to_owned(),
3084 }
3085 ]
3086 );
3087 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3088 assert_eq!(rows.versions(), vec![1, 3]);
3089 }
3090
3091 #[tokio::test(flavor = "multi_thread")]
3092 async fn sqlite_insert_generates_missing_id() {
3093 use sqlx::sqlite::SqlitePoolOptions;
3094
3095 let pool = SqlitePoolOptions::new()
3096 .max_connections(1)
3097 .connect("sqlite::memory:")
3098 .await
3099 .unwrap();
3100
3101 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3102 let order = entity();
3103 mutation_executor
3104 .ensure_schema(&SqliteDialect, &[&order])
3105 .await
3106 .unwrap();
3107
3108 let executor = SqliteSyncExecutor::new(mutation_executor);
3109 let mut ctx = UserContext::new()
3110 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3111 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3112 ctx.insert_resource(SqliteDialect);
3113 ctx.insert_resource(executor);
3114
3115 let repo = ctx
3116 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3117 .unwrap();
3118 let affected = repo
3119 .insert(
3120 &repo
3121 .insert_command()
3122 .value("version", 1_i64)
3123 .value("name", "generated"),
3124 )
3125 .unwrap();
3126 assert_eq!(affected, 1);
3127
3128 let rows = repo
3129 .fetch_entities::<Order>(
3130 &repo
3131 .select()
3132 .project("id")
3133 .project("version")
3134 .project("name")
3135 .filter(Expr::eq("name", "generated")),
3136 )
3137 .unwrap();
3138 assert_eq!(rows.len(), 1);
3139 let order = rows.first().unwrap();
3140 assert!(order.id > 0);
3141 assert_eq!(order.version, 1);
3142 assert_eq!(order.name, "generated");
3143 }
3144
3145 #[tokio::test(flavor = "multi_thread")]
3146 async fn sqlite_save_graph_inserts_nested_rows() {
3147 use sqlx::{Row, sqlite::SqlitePoolOptions};
3148
3149 let pool = SqlitePoolOptions::new()
3150 .max_connections(1)
3151 .connect("sqlite::memory:?cache=shared")
3152 .await
3153 .unwrap();
3154
3155 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3156 let order = entity();
3157 let line = line_entity();
3158 let product = product_entity();
3159 mutation_executor
3160 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3161 .await
3162 .unwrap();
3163
3164 let executor = SqliteSyncExecutor::new(mutation_executor);
3165 let mut ctx = UserContext::new()
3166 .with_metadata(
3167 InMemoryMetadataStore::new()
3168 .with_entity(order)
3169 .with_entity(line)
3170 .with_entity(product),
3171 )
3172 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3173 ctx.insert_resource(SqliteDialect);
3174 ctx.insert_resource(executor);
3175
3176 let repo = ctx
3177 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3178 .unwrap();
3179 let graph = GraphNode::new("Order")
3180 .value("id", 1_u64)
3181 .value("version", 1_i64)
3182 .value("name", "root")
3183 .relation(
3184 "lines",
3185 GraphNode::new("OrderLine")
3186 .value("id", 10_u64)
3187 .value("name", "line-1")
3188 .relation(
3189 "product",
3190 GraphNode::new("Product")
3191 .value("id", 100_u64)
3192 .value("name", "sku-1"),
3193 ),
3194 );
3195
3196 let saved = repo.save_graph(graph).unwrap();
3197 assert_eq!(
3198 saved.relations["lines"][0].values.get("order_id"),
3199 Some(&Value::U64(1))
3200 );
3201 assert_eq!(
3202 saved.relations["lines"][0].values.get("product_id"),
3203 Some(&Value::U64(100))
3204 );
3205
3206 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3207 .fetch_one(&pool)
3208 .await
3209 .unwrap()
3210 .try_get("count")
3211 .unwrap();
3212 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3213 .fetch_one(&pool)
3214 .await
3215 .unwrap();
3216 let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3217 .fetch_one(&pool)
3218 .await
3219 .unwrap()
3220 .try_get("count")
3221 .unwrap();
3222
3223 assert_eq!(order_count, 1);
3224 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3225 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3226 assert_eq!(product_count, 1);
3227 }
3228
3229 #[tokio::test(flavor = "multi_thread")]
3230 async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3231 use sqlx::{Row, sqlite::SqlitePoolOptions};
3232
3233 let pool = SqlitePoolOptions::new()
3234 .max_connections(1)
3235 .connect("sqlite::memory:")
3236 .await
3237 .unwrap();
3238
3239 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3240 let order = entity();
3241 let line = line_entity();
3242 let product = product_entity();
3243 mutation_executor
3244 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3245 .await
3246 .unwrap();
3247
3248 let executor = SqliteSyncExecutor::new(mutation_executor);
3249 let mut ctx = UserContext::new()
3250 .with_metadata(
3251 InMemoryMetadataStore::new()
3252 .with_entity(order)
3253 .with_entity(line)
3254 .with_entity(product),
3255 )
3256 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3257 ctx.insert_resource(SqliteDialect);
3258 ctx.insert_resource(executor);
3259
3260 let repo = ctx
3261 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3262 .unwrap();
3263 let saved = repo
3264 .save_entity_graph(OrderAggregateRow {
3265 id: 2,
3266 version: 1,
3267 name: "typed-root".to_owned(),
3268 lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3269 id: 20,
3270 order_id: 0,
3271 name: "typed-line".to_owned(),
3272 product_id: 200,
3273 product: Some(ProductEntityRow {
3274 id: 200,
3275 name: "typed-sku".to_owned(),
3276 }),
3277 }]),
3278 })
3279 .unwrap();
3280
3281 assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3282 assert_eq!(
3283 saved.relations["lines"][0].values.get("order_id"),
3284 Some(&Value::U64(2))
3285 );
3286 assert_eq!(
3287 saved.relations["lines"][0].values.get("product_id"),
3288 Some(&Value::U64(200))
3289 );
3290
3291 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3292 .fetch_one(&pool)
3293 .await
3294 .unwrap()
3295 .try_get("count")
3296 .unwrap();
3297 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3298 .fetch_one(&pool)
3299 .await
3300 .unwrap();
3301 let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3302 .fetch_one(&pool)
3303 .await
3304 .unwrap();
3305
3306 assert_eq!(order_count, 1);
3307 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3308 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3309 assert_eq!(product_name, "typed-sku");
3310 }
3311
3312 #[tokio::test(flavor = "multi_thread")]
3313 async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3314 use sqlx::sqlite::SqlitePoolOptions;
3315 use std::time::{SystemTime, UNIX_EPOCH};
3316
3317 let db_path = std::env::temp_dir().join(format!(
3318 "teaql-plan-{}-{}.db",
3319 std::process::id(),
3320 SystemTime::now()
3321 .duration_since(UNIX_EPOCH)
3322 .unwrap()
3323 .as_nanos()
3324 ));
3325 let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3326 let pool = SqlitePoolOptions::new()
3327 .max_connections(1)
3328 .connect(&db_url)
3329 .await
3330 .unwrap();
3331
3332 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3333 let order = entity();
3334 let line = line_entity();
3335 let product = product_entity();
3336 mutation_executor
3337 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3338 .await
3339 .unwrap();
3340
3341 sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3342 .execute(&pool)
3343 .await
3344 .unwrap();
3345 sqlx::query(
3346 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3347 (200, 1, 100, 301, 'line-a'),
3348 (201, 1, 100, 302, 'line-b')",
3349 )
3350 .execute(&pool)
3351 .await
3352 .unwrap();
3353 let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3354 .fetch_one(&pool)
3355 .await
3356 .unwrap();
3357 assert_eq!(seeded_lines, 2);
3358
3359 let executor = SqliteSyncExecutor::new(mutation_executor);
3360 let mut ctx = UserContext::new()
3361 .with_metadata(
3362 InMemoryMetadataStore::new()
3363 .with_entity(order)
3364 .with_entity(line)
3365 .with_entity(product),
3366 )
3367 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3368 .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3369 ctx.insert_resource(SqliteDialect);
3370 ctx.insert_resource(executor);
3371
3372 let graph = GraphNode::new("Order")
3373 .value("id", 100_u64)
3374 .value("version", 1_i64)
3375 .value("name", "existing-updated")
3376 .relation(
3377 "lines",
3378 GraphNode::new("OrderLine")
3379 .value("name", "new-line-a")
3380 .value("product_id", 401_u64),
3381 )
3382 .relation(
3383 "lines",
3384 GraphNode::new("OrderLine")
3385 .value("name", "new-line-b")
3386 .value("product_id", 402_u64),
3387 )
3388 .relation(
3389 "lines",
3390 GraphNode::new("OrderLine")
3391 .value("id", 200_u64)
3392 .value("version", 1_i64)
3393 .value("name", "line-a-updated"),
3394 )
3395 .relation(
3396 "lines",
3397 GraphNode::new("OrderLine")
3398 .value("id", 201_u64)
3399 .value("version", 1_i64)
3400 .value("name", "line-b-updated"),
3401 );
3402
3403 let plan = ctx
3404 .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3405 .unwrap();
3406 let counts = plan.grouped_counts();
3407 assert_eq!(
3408 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3409 Some(&1)
3410 );
3411 assert_eq!(
3412 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3413 Some(&2)
3414 );
3415 assert_eq!(
3416 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3417 Some(&2)
3418 );
3419
3420 let create_batch = plan
3421 .batches
3422 .iter()
3423 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3424 .unwrap();
3425 assert_eq!(create_batch.items.len(), 2);
3426 assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3427 assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3428
3429 let update_batch = plan
3430 .batches
3431 .iter()
3432 .find(|batch| {
3433 batch.entity == "OrderLine"
3434 && batch.kind == GraphMutationKind::Update
3435 && batch.update_fields == vec!["name".to_owned()]
3436 })
3437 .unwrap();
3438 assert_eq!(update_batch.items.len(), 2);
3439
3440 let repo = ctx
3441 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3442 .unwrap();
3443 let saved = repo.execute_graph_plan(plan).unwrap();
3444 let lines = saved.relations.get("lines").unwrap();
3445 assert_eq!(lines.len(), 4);
3446
3447 let generated_count: i64 = sqlx::query_scalar(
3448 "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3449 )
3450 .fetch_one(&pool)
3451 .await
3452 .unwrap();
3453 assert_eq!(generated_count, 2);
3454 let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3455 .fetch_one(&pool)
3456 .await
3457 .unwrap();
3458 assert_eq!(updated_name, "line-a-updated");
3459 pool.close().await;
3460 let _ = std::fs::remove_file(db_path);
3461 }
3462
3463 #[tokio::test(flavor = "multi_thread")]
3464 async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3465 use sqlx::{Row, sqlite::SqlitePoolOptions};
3466
3467 let pool = SqlitePoolOptions::new()
3468 .max_connections(1)
3469 .connect("sqlite::memory:")
3470 .await
3471 .unwrap();
3472
3473 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3474 let order = entity();
3475 let line = line_entity();
3476 let product = product_entity();
3477 mutation_executor
3478 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3479 .await
3480 .unwrap();
3481
3482 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3483 .execute(&pool)
3484 .await
3485 .unwrap();
3486 sqlx::query(
3487 "INSERT INTO product (id, name) VALUES
3488 (100, 'old-sku'),
3489 (101, 'removed-sku')",
3490 )
3491 .execute(&pool)
3492 .await
3493 .unwrap();
3494 sqlx::query(
3495 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3496 (10, 1, 100, 'old-line'),
3497 (11, 1, 101, 'removed-line')",
3498 )
3499 .execute(&pool)
3500 .await
3501 .unwrap();
3502 sqlx::query("UPDATE orderline SET version = 1")
3503 .execute(&pool)
3504 .await
3505 .unwrap();
3506
3507 let executor = SqliteSyncExecutor::new(mutation_executor);
3508 let mut ctx = UserContext::new()
3509 .with_metadata(
3510 InMemoryMetadataStore::new()
3511 .with_entity(order)
3512 .with_entity(line)
3513 .with_entity(product),
3514 )
3515 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3516 ctx.insert_resource(SqliteDialect);
3517 ctx.insert_resource(executor);
3518
3519 let repo = ctx
3520 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3521 .unwrap();
3522 let graph = GraphNode::new("Order")
3523 .value("id", 1_u64)
3524 .value("version", 1_i64)
3525 .value("name", "new-root")
3526 .relation(
3527 "lines",
3528 GraphNode::new("OrderLine")
3529 .value("id", 10_u64)
3530 .value("version", 1_i64)
3531 .value("name", "new-line")
3532 .relation(
3533 "product",
3534 GraphNode::new("Product")
3535 .value("id", 100_u64)
3536 .value("name", "new-sku"),
3537 ),
3538 )
3539 .relation(
3540 "lines",
3541 GraphNode::new("OrderLine")
3542 .value("id", 12_u64)
3543 .value("version", 1_i64)
3544 .value("name", "added-line")
3545 .relation(
3546 "product",
3547 GraphNode::new("Product")
3548 .value("id", 102_u64)
3549 .value("name", "added-sku"),
3550 ),
3551 );
3552
3553 let saved = repo.save_graph(graph).unwrap();
3554 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3555
3556 let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3557 .fetch_one(&pool)
3558 .await
3559 .unwrap();
3560 assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3561 assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3562
3563 let updated_line =
3564 sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3565 .fetch_one(&pool)
3566 .await
3567 .unwrap();
3568 assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3569 assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3570 assert_eq!(
3571 updated_line.try_get::<String, _>("name").unwrap(),
3572 "new-line"
3573 );
3574
3575 let added_line =
3576 sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3577 .fetch_one(&pool)
3578 .await
3579 .unwrap();
3580 assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3581 assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3582 assert_eq!(
3583 added_line.try_get::<String, _>("name").unwrap(),
3584 "added-line"
3585 );
3586
3587 let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3588 .fetch_one(&pool)
3589 .await
3590 .unwrap();
3591 assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3592
3593 let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3594 .fetch_one(&pool)
3595 .await
3596 .unwrap();
3597 assert_eq!(
3598 updated_product.try_get::<String, _>("name").unwrap(),
3599 "new-sku"
3600 );
3601 }
3602
3603 #[tokio::test(flavor = "multi_thread")]
3604 async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3605 use sqlx::{Row, sqlite::SqlitePoolOptions};
3606
3607 let pool = SqlitePoolOptions::new()
3608 .max_connections(1)
3609 .connect("sqlite::memory:")
3610 .await
3611 .unwrap();
3612
3613 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3614 let order = sqlite_entity_keep_missing();
3615 let line = line_entity();
3616 let product = product_entity();
3617 mutation_executor
3618 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3619 .await
3620 .unwrap();
3621
3622 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3623 .execute(&pool)
3624 .await
3625 .unwrap();
3626 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3627 .execute(&pool)
3628 .await
3629 .unwrap();
3630 sqlx::query(
3631 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3632 (10, 1, 1, 100, 'remove-me'),
3633 (11, 1, 1, 100, 'keep-me')",
3634 )
3635 .execute(&pool)
3636 .await
3637 .unwrap();
3638
3639 let executor = SqliteSyncExecutor::new(mutation_executor);
3640 let mut ctx = UserContext::new()
3641 .with_metadata(
3642 InMemoryMetadataStore::new()
3643 .with_entity(order)
3644 .with_entity(line)
3645 .with_entity(product),
3646 )
3647 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3648 ctx.insert_resource(SqliteDialect);
3649 ctx.insert_resource(executor);
3650
3651 let repo = ctx
3652 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3653 .unwrap();
3654 let graph = GraphNode::new("Order")
3655 .value("id", 1_u64)
3656 .value("version", 1_i64)
3657 .value("name", "root-updated")
3658 .relation(
3659 "lines",
3660 GraphNode::new("OrderLine")
3661 .value("id", 10_u64)
3662 .value("version", 1_i64)
3663 .remove(),
3664 )
3665 .relation(
3666 "lines",
3667 GraphNode::new("OrderLine")
3668 .value("id", 12_u64)
3669 .value("version", 1_i64)
3670 .value("name", "new-reference-line")
3671 .relation(
3672 "product",
3673 GraphNode::new("Product").value("id", 100_u64).reference(),
3674 ),
3675 );
3676
3677 let saved = repo.save_graph(graph).unwrap();
3678 assert_eq!(
3679 saved.relations["lines"][1].relations["product"][0]
3680 .values
3681 .get("name"),
3682 Some(&Value::Text("reference-only".to_owned()))
3683 );
3684
3685 let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3686 .fetch_one(&pool)
3687 .await
3688 .unwrap();
3689 assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3690
3691 let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3692 .fetch_one(&pool)
3693 .await
3694 .unwrap();
3695 assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3696 assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3697
3698 let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3699 .fetch_one(&pool)
3700 .await
3701 .unwrap();
3702 assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3703
3704 let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3705 .fetch_one(&pool)
3706 .await
3707 .unwrap();
3708 assert_eq!(
3709 product.try_get::<String, _>("name").unwrap(),
3710 "reference-only"
3711 );
3712 }
3713
3714 #[tokio::test(flavor = "multi_thread")]
3715 async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3716 use sqlx::sqlite::SqlitePoolOptions;
3717
3718 let pool = SqlitePoolOptions::new()
3719 .max_connections(1)
3720 .connect("sqlite::memory:")
3721 .await
3722 .unwrap();
3723
3724 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3725 let order = entity();
3726 let line = line_entity();
3727 let product = product_entity();
3728 mutation_executor
3729 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3730 .await
3731 .unwrap();
3732
3733 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3734 .execute(&pool)
3735 .await
3736 .unwrap();
3737 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3738 .execute(&pool)
3739 .await
3740 .unwrap();
3741 sqlx::query(
3742 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3743 (10, 1, 1, 100, 'line-10')",
3744 )
3745 .execute(&pool)
3746 .await
3747 .unwrap();
3748
3749 let executor = SqliteSyncExecutor::new(mutation_executor);
3750 let mut ctx = UserContext::new()
3751 .with_metadata(
3752 InMemoryMetadataStore::new()
3753 .with_entity(order)
3754 .with_entity(line)
3755 .with_entity(product),
3756 )
3757 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3758 ctx.insert_resource(SqliteDialect);
3759 ctx.insert_resource(executor);
3760
3761 let repo = ctx
3762 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3763 .unwrap();
3764
3765 let missing_reference = repo.save_graph(
3766 GraphNode::new("Order").value("id", 1_u64).relation(
3767 "lines",
3768 GraphNode::new("OrderLine")
3769 .value("id", 12_u64)
3770 .value("version", 1_i64)
3771 .value("name", "bad-reference")
3772 .relation(
3773 "product",
3774 GraphNode::new("Product").value("id", 999_u64).reference(),
3775 ),
3776 ),
3777 );
3778 assert!(format!("{missing_reference:?}").contains("does not exist"));
3779
3780 let mutable_reference = repo.save_graph(
3781 GraphNode::new("Order").value("id", 1_u64).relation(
3782 "lines",
3783 GraphNode::new("OrderLine")
3784 .value("id", 12_u64)
3785 .value("version", 1_i64)
3786 .value("name", "mutable-reference")
3787 .relation(
3788 "product",
3789 GraphNode::new("Product")
3790 .value("id", 100_u64)
3791 .value("name", "should-not-mutate")
3792 .reference(),
3793 ),
3794 ),
3795 );
3796 assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3797
3798 let duplicate_child = repo.save_graph(
3799 GraphNode::new("Order")
3800 .value("id", 1_u64)
3801 .relation(
3802 "lines",
3803 GraphNode::new("OrderLine")
3804 .value("id", 10_u64)
3805 .value("version", 1_i64)
3806 .reference(),
3807 )
3808 .relation(
3809 "lines",
3810 GraphNode::new("OrderLine")
3811 .value("id", 10_u64)
3812 .value("version", 1_i64)
3813 .reference(),
3814 ),
3815 );
3816 assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3817
3818 let reference_version_conflict = repo.save_graph(
3819 GraphNode::new("Order").value("id", 1_u64).relation(
3820 "lines",
3821 GraphNode::new("OrderLine")
3822 .value("id", 10_u64)
3823 .value("version", 2_i64)
3824 .reference(),
3825 ),
3826 );
3827 assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3828
3829 let remove_with_child = repo.save_graph(
3830 GraphNode::new("Order").value("id", 1_u64).relation(
3831 "lines",
3832 GraphNode::new("OrderLine")
3833 .value("id", 10_u64)
3834 .value("version", 1_i64)
3835 .relation(
3836 "product",
3837 GraphNode::new("Product").value("id", 100_u64).reference(),
3838 )
3839 .remove(),
3840 ),
3841 );
3842 assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3843
3844 let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3845 assert!(format!("{remove:?}").contains("does not exist"));
3846 }
3847
3848 #[tokio::test(flavor = "multi_thread")]
3849 async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3850 use sqlx::{Row, sqlite::SqlitePoolOptions};
3851
3852 let pool = SqlitePoolOptions::new()
3853 .max_connections(1)
3854 .connect("sqlite::memory:")
3855 .await
3856 .unwrap();
3857
3858 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3859 let order = entity();
3860 let line = line_entity();
3861 let product = product_entity();
3862 mutation_executor
3863 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3864 .await
3865 .unwrap();
3866
3867 let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3868 let mut ctx = UserContext::new()
3869 .with_metadata(
3870 InMemoryMetadataStore::new()
3871 .with_entity(order)
3872 .with_entity(line)
3873 .with_entity(product),
3874 )
3875 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3876 ctx.insert_resource(SqliteDialect);
3877 ctx.insert_resource(executor);
3878
3879 let repo = ctx
3880 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3881 .unwrap();
3882 let error = repo.save_graph(
3883 GraphNode::new("Order")
3884 .value("id", 1_u64)
3885 .value("version", 1_i64)
3886 .value("name", "rollback-root")
3887 .relation(
3888 "lines",
3889 GraphNode::new("OrderLine")
3890 .value("id", 10_u64)
3891 .value("version", 1_i64)
3892 .value("name", "rollback-line")
3893 .value("product_id", 999_u64)
3894 .relation(
3895 "product",
3896 GraphNode::new("Product").value("id", 999_u64).reference(),
3897 ),
3898 ),
3899 );
3900 assert!(format!("{error:?}").contains("does not exist"));
3901
3902 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3903 .fetch_one(&pool)
3904 .await
3905 .unwrap()
3906 .try_get("count")
3907 .unwrap();
3908 assert_eq!(count, 0);
3909 }
3910
3911 #[tokio::test]
3912 async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3913 use sqlx::{PgPool, Row};
3914
3915 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3916 return;
3917 };
3918
3919 let pool = PgPool::connect(&database_url).await.unwrap();
3920 sqlx::query("DROP TABLE IF EXISTS orderline")
3921 .execute(&pool)
3922 .await
3923 .unwrap();
3924 sqlx::query("DROP TABLE IF EXISTS orders")
3925 .execute(&pool)
3926 .await
3927 .unwrap();
3928
3929 let executor = PgMutationExecutor::new(pool.clone());
3930 let dialect = PostgresDialect;
3931 let order = entity();
3932 let line = line_entity();
3933
3934 executor
3935 .ensure_schema(&dialect, &[&order, &line])
3936 .await
3937 .unwrap();
3938
3939 let tables = sqlx::query(
3940 "SELECT table_name
3941 FROM information_schema.tables
3942 WHERE table_schema = current_schema()
3943 AND table_name IN ('orders', 'orderline')
3944 ORDER BY table_name",
3945 )
3946 .fetch_all(&pool)
3947 .await
3948 .unwrap()
3949 .into_iter()
3950 .map(|row| row.try_get::<String, _>("table_name").unwrap())
3951 .collect::<Vec<_>>();
3952 assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3953
3954 let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3955 .fetch_one(&pool)
3956 .await
3957 .unwrap();
3958 assert_eq!(soundex, "R163");
3959
3960 sqlx::query(
3961 "INSERT INTO orders (id, version, name) VALUES
3962 (1, 1, 'draft'),
3963 (2, 1, 'submitted'),
3964 (3, 1, 'archived')",
3965 )
3966 .execute(&pool)
3967 .await
3968 .unwrap();
3969
3970 let array_bound_query = dialect
3971 .compile_select(
3972 &order,
3973 &SelectQuery::new("Order")
3974 .filter(
3975 Expr::in_large(
3976 "id",
3977 vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
3978 )
3979 .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
3980 )
3981 .order_asc("id"),
3982 )
3983 .unwrap();
3984 assert_eq!(
3985 array_bound_query.sql,
3986 "SELECT * FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
3987 );
3988 let rows = executor.fetch_all(&array_bound_query).await.unwrap();
3989 assert_eq!(rows.len(), 2);
3990 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
3991 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
3992
3993 sqlx::query(
3994 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3995 (11, 1, 1, 101, 'line-1'),
3996 (12, 1, 2, 102, 'line-2'),
3997 (13, 1, 3, 103, 'archived-line')",
3998 )
3999 .execute(&pool)
4000 .await
4001 .unwrap();
4002
4003 let subquery = dialect
4004 .compile_select(
4005 &order,
4006 &SelectQuery::new("Order")
4007 .filter(Expr::in_subquery(
4008 "id",
4009 line.clone(),
4010 SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4011 "order_id",
4012 ))
4013 .order_asc("id"),
4014 )
4015 .unwrap();
4016 assert_eq!(
4017 subquery.sql,
4018 "SELECT * FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4019 );
4020 let rows = executor.fetch_all(&subquery).await.unwrap();
4021 assert_eq!(rows.len(), 2);
4022 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4023 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4024
4025 let projected = dialect
4026 .compile_select(
4027 &order,
4028 &SelectQuery::new("Order")
4029 .project("id")
4030 .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4031 .order_gbk_asc("name")
4032 .limit(1),
4033 )
4034 .unwrap();
4035 assert_eq!(
4036 projected.sql,
4037 "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4038 );
4039 let rows = executor.fetch_all(&projected).await.unwrap();
4040 assert_eq!(rows.len(), 1);
4041 assert!(rows[0].contains_key("nameSound"));
4042
4043 let aggregate = dialect
4044 .compile_select(
4045 &order,
4046 &SelectQuery::new("Order")
4047 .count("total")
4048 .stddev("version", "stddevVersion")
4049 .var_pop("version", "varPopVersion")
4050 .bit_or("version", "bitOrVersion")
4051 .having(Expr::binary(
4052 Expr::count_all(),
4053 teaql_core::BinaryOp::Gt,
4054 Expr::value(2_i64),
4055 )),
4056 )
4057 .unwrap();
4058 assert_eq!(
4059 aggregate.sql,
4060 "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4061 );
4062 let rows = executor.fetch_all(&aggregate).await.unwrap();
4063 assert_eq!(rows.len(), 1);
4064 assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4065 assert!(matches!(
4066 rows[0].get("stddevVersion"),
4067 Some(Value::Decimal(_))
4068 ));
4069 assert!(matches!(
4070 rows[0].get("varPopVersion"),
4071 Some(Value::Decimal(_))
4072 ));
4073 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4074
4075 sqlx::query("DROP TABLE orderline")
4076 .execute(&pool)
4077 .await
4078 .unwrap();
4079 sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4080 .execute(&pool)
4081 .await
4082 .unwrap();
4083
4084 executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4085
4086 let columns = sqlx::query(
4087 "SELECT column_name
4088 FROM information_schema.columns
4089 WHERE table_schema = current_schema()
4090 AND table_name = 'orderline'
4091 ORDER BY column_name",
4092 )
4093 .fetch_all(&pool)
4094 .await
4095 .unwrap()
4096 .into_iter()
4097 .map(|row| row.try_get::<String, _>("column_name").unwrap())
4098 .collect::<Vec<_>>();
4099 assert!(columns.contains(&"id".to_owned()));
4100 assert!(columns.contains(&"order_id".to_owned()));
4101 assert!(columns.contains(&"product_id".to_owned()));
4102 assert!(columns.contains(&"name".to_owned()));
4103
4104 sqlx::query("DROP TABLE IF EXISTS orderline")
4105 .execute(&pool)
4106 .await
4107 .unwrap();
4108 sqlx::query("DROP TABLE IF EXISTS orders")
4109 .execute(&pool)
4110 .await
4111 .unwrap();
4112 }
4113
4114 #[tokio::test(flavor = "multi_thread")]
4115 async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4116 use sqlx::{PgPool, Row};
4117
4118 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4119 return;
4120 };
4121
4122 let pool = PgPool::connect(&database_url).await.unwrap();
4123 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4124 .execute(&pool)
4125 .await
4126 .unwrap();
4127 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4128 .execute(&pool)
4129 .await
4130 .unwrap();
4131
4132 let order = entity().table_name("orders_idgen");
4133 let executor = PgMutationExecutor::new(pool.clone());
4134 executor
4135 .ensure_schema(&PostgresDialect, &[&order])
4136 .await
4137 .unwrap();
4138
4139 let mut ctx = UserContext::new()
4140 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4141 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4142 .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4143 ctx.insert_resource(PostgresDialect);
4144 ctx.insert_resource(PgSyncExecutor::new(executor));
4145
4146 let repo = ctx
4147 .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4148 .unwrap();
4149 repo.insert(
4150 &repo
4151 .insert_command()
4152 .value("version", 1_i64)
4153 .value("name", "generated-1"),
4154 )
4155 .unwrap();
4156 repo.insert(
4157 &repo
4158 .insert_command()
4159 .value("version", 1_i64)
4160 .value("name", "generated-2"),
4161 )
4162 .unwrap();
4163
4164 let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4165 .fetch_all(&pool)
4166 .await
4167 .unwrap()
4168 .into_iter()
4169 .map(|row| row.try_get::<i64, _>("id").unwrap())
4170 .collect::<Vec<_>>();
4171 assert_eq!(ids, vec![1, 2]);
4172
4173 let current: i64 = sqlx::query_scalar(
4174 "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4175 )
4176 .fetch_one(&pool)
4177 .await
4178 .unwrap();
4179 assert_eq!(current, 2);
4180
4181 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4182 .execute(&pool)
4183 .await
4184 .unwrap();
4185 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4186 .execute(&pool)
4187 .await
4188 .unwrap();
4189 }
4190
4191 #[tokio::test]
4192 async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4193 use sqlx::PgPool;
4194
4195 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4196 return;
4197 };
4198
4199 let pool = PgPool::connect(&database_url).await.unwrap();
4200 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4201 .execute(&pool)
4202 .await
4203 .unwrap();
4204
4205 let order = entity().table_name("orders_agg");
4206 let executor = PgMutationExecutor::new(pool.clone());
4207 let dialect = PostgresDialect;
4208 executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4209
4210 sqlx::query(
4211 "INSERT INTO orders_agg (id, version, name) VALUES
4212 (1, 1, 'A'),
4213 (2, 2, 'A'),
4214 (3, 3, 'B'),
4215 (4, 4, 'B')",
4216 )
4217 .execute(&pool)
4218 .await
4219 .unwrap();
4220
4221 let aggregate = dialect
4222 .compile_select(
4223 &order,
4224 &SelectQuery::new("Order")
4225 .count("total")
4226 .count_field("version", "versionCount")
4227 .sum("version", "sumVersion")
4228 .avg("version", "avgVersion")
4229 .min("version", "minVersion")
4230 .max("version", "maxVersion")
4231 .stddev("version", "stddevVersion")
4232 .stddev_pop("version", "stddevPopVersion")
4233 .var_samp("version", "varSampVersion")
4234 .var_pop("version", "varPopVersion")
4235 .bit_and("version", "bitAndVersion")
4236 .bit_or("version", "bitOrVersion")
4237 .bit_xor("version", "bitXorVersion")
4238 .having(Expr::binary(
4239 Expr::count_all(),
4240 teaql_core::BinaryOp::Gt,
4241 Expr::value(3_i64),
4242 )),
4243 )
4244 .unwrap();
4245 let rows = executor.fetch_all(&aggregate).await.unwrap();
4246 assert_eq!(rows.len(), 1);
4247 let row = &rows[0];
4248 assert_eq!(row.get("total"), Some(&Value::I64(4)));
4249 assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4250 assert_eq!(
4251 row.get("sumVersion"),
4252 Some(&Value::Decimal(Decimal::new(10, 0)))
4253 );
4254 assert_eq!(
4255 row.get("avgVersion"),
4256 Some(&Value::Decimal(Decimal::new(25, 1)))
4257 );
4258 assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4259 assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4260 assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4261 assert!(matches!(
4262 row.get("stddevPopVersion"),
4263 Some(Value::Decimal(_))
4264 ));
4265 assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4266 assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4267 assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4268 assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4269 assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4270
4271 let grouped = dialect
4272 .compile_select(
4273 &order,
4274 &SelectQuery::new("Order")
4275 .group_by("name")
4276 .count("total")
4277 .sum("version", "sumVersion")
4278 .having(Expr::binary(
4279 Expr::count_all(),
4280 teaql_core::BinaryOp::Gt,
4281 Expr::value(1_i64),
4282 ))
4283 .order_asc("name"),
4284 )
4285 .unwrap();
4286 let rows = executor.fetch_all(&grouped).await.unwrap();
4287 assert_eq!(rows.len(), 2);
4288 assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4289 assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4290 assert_eq!(
4291 rows[0].get("sumVersion"),
4292 Some(&Value::Decimal(Decimal::new(3, 0)))
4293 );
4294 assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4295 assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4296 assert_eq!(
4297 rows[1].get("sumVersion"),
4298 Some(&Value::Decimal(Decimal::new(7, 0)))
4299 );
4300
4301 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4302 .execute(&pool)
4303 .await
4304 .unwrap();
4305 }
4306
4307 #[tokio::test]
4308 async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4309 use sqlx::{PgPool, Row};
4310
4311 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4312 return;
4313 };
4314
4315 let pool = PgPool::connect(&database_url).await.unwrap();
4316 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4317 .execute(&pool)
4318 .await
4319 .unwrap();
4320 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4321 .execute(&pool)
4322 .await
4323 .unwrap();
4324 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4325 .execute(&pool)
4326 .await
4327 .unwrap();
4328
4329 let order = entity().table_name("orders_ctx");
4330 let line = line_entity().table_name("orderline_ctx");
4331 let product = product_entity().table_name("product_ctx");
4332 let module = super::RuntimeModule::new()
4333 .descriptor(order)
4334 .descriptor(line)
4335 .descriptor(product);
4336 let mut ctx = super::UserContext::new().with_module(module);
4337 ctx.insert_resource(PostgresDialect);
4338 ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4339
4340 ctx.ensure_postgres_schema().await.unwrap();
4341
4342 let tables = sqlx::query(
4343 "SELECT table_name
4344 FROM information_schema.tables
4345 WHERE table_schema = current_schema()
4346 AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4347 ORDER BY table_name",
4348 )
4349 .fetch_all(&pool)
4350 .await
4351 .unwrap()
4352 .into_iter()
4353 .map(|row| row.try_get::<String, _>("table_name").unwrap())
4354 .collect::<Vec<_>>();
4355
4356 assert_eq!(
4357 tables,
4358 vec![
4359 "orderline_ctx".to_owned(),
4360 "orders_ctx".to_owned(),
4361 "product_ctx".to_owned()
4362 ]
4363 );
4364
4365 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4366 .execute(&pool)
4367 .await
4368 .unwrap();
4369 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4370 .execute(&pool)
4371 .await
4372 .unwrap();
4373 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4374 .execute(&pool)
4375 .await
4376 .unwrap();
4377 }
4378
4379 #[tokio::test(flavor = "multi_thread")]
4380 async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4381 use sqlx::{PgPool, Row};
4382
4383 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4384 return;
4385 };
4386
4387 let pool = PgPool::connect(&database_url).await.unwrap();
4388 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4389 .execute(&pool)
4390 .await
4391 .unwrap();
4392 sqlx::query("DROP TABLE IF EXISTS product_tx")
4393 .execute(&pool)
4394 .await
4395 .unwrap();
4396 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4397 .execute(&pool)
4398 .await
4399 .unwrap();
4400
4401 let order = entity().table_name("orders_tx");
4402 let line = line_entity().table_name("orderline_tx");
4403 let product = product_entity().table_name("product_tx");
4404 let schema_executor = PgMutationExecutor::new(pool.clone());
4405 schema_executor
4406 .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4407 .await
4408 .unwrap();
4409
4410 let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4411 let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4412 let mut ctx = UserContext::new()
4413 .with_metadata(
4414 InMemoryMetadataStore::new()
4415 .with_entity(order)
4416 .with_entity(line)
4417 .with_entity(product),
4418 )
4419 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4420 ctx.insert_resource(PostgresDialect);
4421 ctx.insert_resource(sync_executor);
4422
4423 let repo = ctx
4424 .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4425 .unwrap();
4426 repo.save_graph(
4427 GraphNode::new("Order")
4428 .value("id", 1_u64)
4429 .value("version", 1_i64)
4430 .value("name", "pg-rollback-root")
4431 .relation(
4432 "lines",
4433 GraphNode::new("OrderLine")
4434 .value("id", 10_u64)
4435 .value("version", 1_i64)
4436 .value("name", "pg-rollback-line")
4437 .relation(
4438 "product",
4439 GraphNode::new("Product")
4440 .value("id", 100_u64)
4441 .value("name", "pg-rollback-sku"),
4442 ),
4443 ),
4444 )
4445 .unwrap();
4446
4447 tx_executor.rollback().await.unwrap();
4448
4449 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4450 .fetch_one(&pool)
4451 .await
4452 .unwrap()
4453 .try_get("count")
4454 .unwrap();
4455 assert_eq!(count, 0);
4456
4457 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4458 .execute(&pool)
4459 .await
4460 .unwrap();
4461 sqlx::query("DROP TABLE IF EXISTS product_tx")
4462 .execute(&pool)
4463 .await
4464 .unwrap();
4465 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4466 .execute(&pool)
4467 .await
4468 .unwrap();
4469 }
4470}
4471pub use checker::{
4472 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4473 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4474 mark_record_status,
4475};