1mod checker;
2mod context;
3mod entity_runtime;
4mod error;
5mod event;
6mod graph;
7mod id;
8mod language;
9mod memory;
10mod registry;
11mod repository;
12
13pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
14pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
15pub use error::{ContextError, RepositoryError, RuntimeError};
16pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
17pub use graph::{
18 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
19 GraphOperation, sorted_update_fields,
20};
21pub(crate) use id::local_id_generator;
22pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
23pub use language::{
24 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
25};
26pub use memory::{MemoryRepository, MemoryRepositoryError};
27pub use registry::{
28 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
29 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
30 RuntimeModule,
31};
32pub use repository::{
33 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
34 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
35};
36
37#[cfg(feature = "sqlx")]
38pub mod sqlx_support;
39
40#[cfg(test)]
41mod tests {
42 use std::collections::{BTreeMap, VecDeque};
43 use std::sync::{Arc, Mutex};
44
45 use super::{
46 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
47 Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
48 GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
49 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
50 InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
51 QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
52 RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
53 };
54 use teaql_core::{
55 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
56 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
57 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
58 };
59 use teaql_dialect_pg::PostgresDialect;
60 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
61 use teaql_sql::CompiledQuery;
62
63 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
64
65 fn entity() -> EntityDescriptor {
66 EntityDescriptor::new("Order")
67 .table_name("orders")
68 .property(
69 PropertyDescriptor::new("id", DataType::U64)
70 .column_name("id")
71 .id()
72 .not_null(),
73 )
74 .property(
75 PropertyDescriptor::new("version", DataType::I64)
76 .column_name("version")
77 .version()
78 .not_null(),
79 )
80 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
81 .relation(
82 teaql_core::RelationDescriptor::new("lines", "OrderLine")
83 .local_key("id")
84 .foreign_key("order_id")
85 .many(),
86 )
87 }
88
89 fn line_entity() -> EntityDescriptor {
90 EntityDescriptor::new("OrderLine")
91 .table_name("orderline")
92 .property(
93 PropertyDescriptor::new("id", DataType::U64)
94 .column_name("id")
95 .id()
96 .not_null(),
97 )
98 .property(
99 PropertyDescriptor::new("version", DataType::I64)
100 .column_name("version")
101 .version(),
102 )
103 .property(
104 PropertyDescriptor::new("order_id", DataType::U64)
105 .column_name("order_id")
106 .not_null(),
107 )
108 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
109 .property(
110 PropertyDescriptor::new("product_id", DataType::U64)
111 .column_name("product_id")
112 .not_null(),
113 )
114 .relation(
115 teaql_core::RelationDescriptor::new("product", "Product")
116 .local_key("product_id")
117 .foreign_key("id"),
118 )
119 }
120
121 fn product_entity() -> EntityDescriptor {
122 EntityDescriptor::new("Product")
123 .table_name("product")
124 .property(
125 PropertyDescriptor::new("id", DataType::U64)
126 .column_name("id")
127 .id()
128 .not_null(),
129 )
130 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
131 }
132
133 #[derive(Debug, Default)]
134 struct StubExecutor {
135 affected: u64,
136 rows: Vec<Record>,
137 }
138
139 #[derive(Debug, Default)]
140 struct QueueExecutor {
141 affected: u64,
142 rows: Mutex<VecDeque<Vec<Record>>>,
143 queries: Mutex<Vec<String>>,
144 }
145
146 struct OrderBehavior;
147
148 #[allow(dead_code)]
149 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
150 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
151 struct CatalogProductRow {
152 #[teaql(id)]
153 id: u64,
154 name: String,
155 }
156
157 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
158 #[teaql(entity = "OrderAggregate", table = "orders")]
159 struct OrderAggregateDynamic {
160 #[teaql(id)]
161 id: u64,
162 #[teaql(dynamic)]
163 dynamic: BTreeMap<String, Value>,
164 }
165
166 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
167 #[teaql(entity = "Product", table = "product")]
168 struct ProductEntityRow {
169 #[teaql(id)]
170 id: u64,
171 name: String,
172 }
173
174 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
175 #[teaql(entity = "OrderLine", table = "orderline")]
176 struct OrderLineEntityRow {
177 #[teaql(id)]
178 id: u64,
179 #[teaql(column = "order_id")]
180 order_id: u64,
181 name: String,
182 #[teaql(column = "product_id")]
183 product_id: u64,
184 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
185 product: Option<ProductEntityRow>,
186 }
187
188 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
189 #[teaql(entity = "Order", table = "orders")]
190 struct OrderAggregateRow {
191 #[teaql(id)]
192 id: u64,
193 #[teaql(version)]
194 version: i64,
195 name: String,
196 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
197 lines: teaql_core::SmartList<OrderLineEntityRow>,
198 }
199
200 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201 #[teaql(entity = "Order", table = "orders")]
202 struct Order {
203 #[teaql(id)]
204 id: u64,
205 #[teaql(version)]
206 version: i64,
207 name: String,
208 }
209
210 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
211 #[teaql(entity = "Product", table = "product")]
212 struct TypedGraphProduct {
213 #[teaql(id)]
214 id: Option<u64>,
215 name: String,
216 }
217
218 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
219 #[teaql(entity = "OrderLine", table = "orderline")]
220 struct TypedGraphLine {
221 #[teaql(id)]
222 id: Option<u64>,
223 #[teaql(column = "order_id")]
224 order_id: Option<u64>,
225 name: String,
226 #[teaql(column = "product_id")]
227 product_id: Option<u64>,
228 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
229 product: Option<TypedGraphProduct>,
230 }
231
232 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
233 #[teaql(entity = "Order", table = "orders")]
234 struct TypedGraphOrder {
235 #[teaql(id)]
236 id: Option<u64>,
237 #[teaql(version)]
238 version: i64,
239 name: String,
240 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
241 lines: teaql_core::SmartList<TypedGraphLine>,
242 }
243
244 #[derive(Debug, PartialEq, Eq)]
245 struct OrderEntity {
246 id: u64,
247 version: i64,
248 name: String,
249 }
250
251 impl teaql_core::TeaqlEntity for OrderEntity {
252 fn entity_descriptor() -> EntityDescriptor {
253 entity()
254 }
255 }
256
257 impl Entity for OrderEntity {
258 fn from_record(record: Record) -> Result<Self, EntityError> {
259 let id = match record.get("id") {
260 Some(Value::U64(v)) => *v,
261 Some(Value::I64(v)) if *v >= 0 => *v as u64,
262 other => {
263 return Err(EntityError::new(
264 "Order",
265 format!("invalid id field: {other:?}"),
266 ));
267 }
268 };
269 let version = match record.get("version") {
270 Some(Value::I64(v)) => *v,
271 other => {
272 return Err(EntityError::new(
273 "Order",
274 format!("invalid version field: {other:?}"),
275 ));
276 }
277 };
278 let name = match record.get("name") {
279 Some(Value::Text(v)) => v.clone(),
280 other => {
281 return Err(EntityError::new(
282 "Order",
283 format!("invalid name field: {other:?}"),
284 ));
285 }
286 };
287 Ok(Self { id, version, name })
288 }
289
290 fn into_record(self) -> Record {
291 Record::from([
292 (String::from("id"), Value::U64(self.id)),
293 (String::from("version"), Value::I64(self.version)),
294 (String::from("name"), Value::Text(self.name)),
295 ])
296 }
297 }
298
299 #[derive(Debug)]
300 struct StubError;
301
302 impl std::fmt::Display for StubError {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 write!(f, "stub error")
305 }
306 }
307
308 impl std::error::Error for StubError {}
309
310 impl QueryExecutor for StubExecutor {
311 type Error = StubError;
312
313 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
314 Ok(self.rows.clone())
315 }
316
317 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
318 Ok(self.affected)
319 }
320
321 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
322 Ok(GraphTransactionBoundary::Started)
323 }
324 }
325
326 impl QueryExecutor for QueueExecutor {
327 type Error = StubError;
328
329 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
330 self.queries.lock().unwrap().push(query.sql.clone());
331 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
332 }
333
334 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
335 Ok(self.affected)
336 }
337 }
338
339 #[test]
340 fn user_context_records_configured_sql_logs() {
341 let mut ctx = UserContext::new()
342 .with_module(crate::module!(Order))
343 .with_sql_log_options(SqlLogOptions::select_only());
344 ctx.insert_resource(PostgresDialect);
345 ctx.insert_resource(StubExecutor {
346 affected: 1,
347 rows: Vec::new(),
348 });
349
350 {
351 let repo = ctx
352 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
353 .unwrap();
354 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
355 .unwrap();
356 repo.insert(&InsertCommand::new("Order").value("name", "created"))
357 .unwrap();
358 }
359
360 let logs = ctx.sql_logs();
361 assert_eq!(logs.len(), 1);
362 assert_eq!(logs[0].operation, SqlLogOperation::Select);
363 assert_eq!(
364 logs[0].debug_sql,
365 format!(
366 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
367 )
368 );
369
370 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
371 ctx.clear_sql_logs();
372 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
373 .unwrap()
374 .update(
375 &UpdateCommand::new("Order", 1_u64)
376 .value("name", "updated")
377 .expected_version(1),
378 )
379 .unwrap();
380
381 let logs = ctx.sql_logs();
382 assert_eq!(logs.len(), 1);
383 assert_eq!(logs[0].operation, SqlLogOperation::Update);
384 assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
385 assert!(logs[0].debug_sql.contains("'updated'"));
386 }
387
388 impl RepositoryBehavior for OrderBehavior {
389 fn before_select(
390 &self,
391 _ctx: &UserContext,
392 query: &mut teaql_core::SelectQuery,
393 ) -> Result<(), RuntimeError> {
394 query.filter = Some(Expr::eq("version", 1_i64));
395 Ok(())
396 }
397
398 fn before_insert(
399 &self,
400 _ctx: &UserContext,
401 command: &mut InsertCommand,
402 ) -> Result<(), RuntimeError> {
403 command
404 .values
405 .entry("version".to_owned())
406 .or_insert(Value::I64(1));
407 Ok(())
408 }
409
410 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
411 vec!["lines".to_owned()]
412 }
413 }
414
415 struct ContextAwareOrderBehavior;
416 struct OrderChecker;
417 #[derive(Clone)]
418 struct RecordingEventSink {
419 events: Arc<Mutex<Vec<EntityEvent>>>,
420 }
421
422 impl RepositoryBehavior for ContextAwareOrderBehavior {
423 fn before_insert(
424 &self,
425 ctx: &UserContext,
426 command: &mut InsertCommand,
427 ) -> Result<(), RuntimeError> {
428 let tenant = ctx
429 .get_named_resource::<String>("tenant")
430 .cloned()
431 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
432 let version = *ctx
433 .get_named_resource::<i64>("initial_version")
434 .ok_or_else(|| {
435 RuntimeError::Behavior("missing initial_version resource".to_owned())
436 })?;
437 let trace_id = match ctx.local("trace_id") {
438 Some(Value::Text(value)) => value.clone(),
439 other => {
440 return Err(RuntimeError::Behavior(format!(
441 "missing trace_id local, got {other:?}"
442 )));
443 }
444 };
445
446 command
447 .values
448 .entry("name".to_owned())
449 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
450 command
451 .values
452 .entry("version".to_owned())
453 .or_insert(Value::I64(version));
454 Ok(())
455 }
456 }
457
458 impl Checker for OrderChecker {
459 fn entity(&self) -> &str {
460 "Order"
461 }
462
463 fn check_and_fix(
464 &self,
465 _ctx: &UserContext,
466 record: &mut Record,
467 location: &ObjectLocation,
468 results: &mut CheckResults,
469 ) {
470 let status = CheckObjectStatus::from_record(record);
471 if status.is_create() {
472 self.required(record, "name", location, results);
473 record.entry("version".to_owned()).or_insert(Value::I64(1));
474 }
475 if status.is_update()
476 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
477 {
478 record.insert(
479 "name".to_owned(),
480 Value::Text("graph-update-checked".to_owned()),
481 );
482 }
483 self.min_string_length(record, "name", 3, location, results);
484 }
485 }
486
487 impl EntityEventSink for RecordingEventSink {
488 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
489 self.events.lock().unwrap().push(event.clone());
490 Ok(())
491 }
492 }
493
494 struct FixedIdGenerator(u64);
495
496 impl InternalIdGenerator for FixedIdGenerator {
497 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
498 Ok(self.0)
499 }
500 }
501
502 struct SequentialIdGenerator {
503 next: Mutex<u64>,
504 }
505
506 impl SequentialIdGenerator {
507 fn new(next: u64) -> Self {
508 Self {
509 next: Mutex::new(next),
510 }
511 }
512 }
513
514 impl InternalIdGenerator for SequentialIdGenerator {
515 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
516 let mut next = self
517 .next
518 .lock()
519 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
520 let id = *next;
521 *next += 1;
522 Ok(id)
523 }
524 }
525
526 #[test]
527 fn metadata_store_registers_entities() {
528 let store = InMemoryMetadataStore::new().with_entity(entity());
529 assert!(store.entity("Order").is_some());
530 }
531
532 #[test]
533 fn runtime_module_registers_descriptor_into_context() {
534 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
535 assert!(ctx.entity("Order").is_some());
536 assert!(ctx.has_repository("Order"));
537 }
538
539 #[test]
540 fn runtime_module_registers_derived_entity_and_behavior() {
541 let ctx = UserContext::new().with_module(
542 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
543 );
544 assert!(ctx.entity("CatalogProduct").is_some());
545 assert!(ctx.has_repository("CatalogProduct"));
546 assert!(ctx.repository_behavior("CatalogProduct").is_some());
547 }
548
549 #[test]
550 fn module_macro_registers_multiple_entities() {
551 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
552 assert!(ctx.entity("CatalogProduct").is_some());
553 assert!(ctx.has_repository("CatalogProduct"));
554 }
555
556 #[test]
557 fn module_macro_registers_entity_behavior_pairs() {
558 let ctx =
559 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
560 assert!(ctx.entity("CatalogProduct").is_some());
561 assert!(ctx.repository_behavior("CatalogProduct").is_some());
562 }
563
564 #[test]
565 fn repository_returns_optimistic_lock_conflict() {
566 let store = InMemoryMetadataStore::new().with_entity(entity());
567 let executor = StubExecutor {
568 affected: 0,
569 rows: Vec::new(),
570 };
571 let repo = Repository::new(&PostgresDialect, &store, &executor);
572
573 let err = repo
574 .update(
575 &UpdateCommand::new("Order", 1_u64)
576 .expected_version(3)
577 .value("name", "next"),
578 )
579 .unwrap_err();
580
581 match err {
582 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
583 other => panic!("unexpected error: {other}"),
584 }
585 }
586
587 #[test]
588 fn user_context_indexes_resources_and_locals() {
589 let mut ctx =
590 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
591 ctx.insert_resource::<u64>(42);
592 ctx.insert_named_resource("tenant", String::from("acme"));
593 ctx.put_local("trace_id", "req-1");
594
595 assert!(ctx.entity("Order").is_some());
596 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
597 assert_eq!(
598 ctx.get_named_resource::<String>("tenant"),
599 Some(&String::from("acme"))
600 );
601 assert_eq!(
602 ctx.local("trace_id"),
603 Some(&Value::Text("req-1".to_owned()))
604 );
605 }
606
607 #[test]
608 fn user_context_builds_context_repository() {
609 let mut ctx =
610 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
611 ctx.insert_resource(PostgresDialect);
612 ctx.insert_resource(StubExecutor {
613 affected: 1,
614 rows: Vec::new(),
615 });
616
617 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
618 let affected = repo
619 .update(
620 &UpdateCommand::new("Order", 1_u64)
621 .expected_version(3)
622 .value("name", "next"),
623 )
624 .unwrap();
625
626 assert_eq!(affected, 1);
627 }
628
629 #[test]
630 fn user_context_resolves_repository_by_entity_type() {
631 let mut ctx = UserContext::new()
632 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
633 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
634 ctx.insert_resource(PostgresDialect);
635 ctx.insert_resource(StubExecutor {
636 affected: 1,
637 rows: Vec::new(),
638 });
639
640 let repo = ctx
641 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
642 .unwrap();
643 assert_eq!(repo.entity(), "Order");
644 assert_eq!(repo.select().entity, "Order");
645
646 let affected = repo
647 .insert(
648 &repo
649 .insert_command()
650 .value("id", 1_u64)
651 .value("version", 1_i64)
652 .value("name", "n"),
653 )
654 .unwrap();
655 assert_eq!(affected, 1);
656 }
657
658 #[test]
659 fn resolved_repository_applies_behavior_hooks() {
660 let mut ctx = UserContext::new()
661 .with_metadata(
662 InMemoryMetadataStore::new()
663 .with_entity(entity())
664 .with_entity(line_entity())
665 .with_entity(product_entity()),
666 )
667 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
668 .with_repository_behavior_registry(
669 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
670 );
671 ctx.insert_resource(PostgresDialect);
672 ctx.insert_resource(StubExecutor {
673 affected: 1,
674 rows: Vec::new(),
675 });
676
677 let repo = ctx
678 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
679 .unwrap();
680
681 let compiled = repo.compile(&repo.select()).unwrap();
682 assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
683
684 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
685 let affected = repo.insert(&insert).unwrap();
686 assert_eq!(affected, 1);
687 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
688 }
689
690 #[test]
691 fn resolved_repository_prepares_insert_command_with_generated_id() {
692 let mut ctx = UserContext::new()
693 .with_metadata(
694 InMemoryMetadataStore::new()
695 .with_entity(entity())
696 .with_entity(line_entity())
697 .with_entity(product_entity()),
698 )
699 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
700 .with_repository_behavior_registry(
701 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
702 )
703 .with_internal_id_generator(FixedIdGenerator(42));
704 ctx.insert_resource(PostgresDialect);
705 ctx.insert_resource(StubExecutor {
706 affected: 1,
707 rows: Vec::new(),
708 });
709
710 let repo = ctx
711 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
712 .unwrap();
713
714 let prepared = repo
715 .prepare_insert_command(&repo.insert_command().value("name", "n"))
716 .unwrap();
717
718 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
719 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
720 assert_eq!(
721 prepared.values.get("name"),
722 Some(&Value::Text("n".to_owned()))
723 );
724 }
725
726 #[test]
727 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
728 let mut ctx = UserContext::new()
729 .with_metadata(
730 InMemoryMetadataStore::new()
731 .with_entity(entity())
732 .with_entity(line_entity())
733 .with_entity(product_entity()),
734 )
735 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
736 .with_repository_behavior_registry(
737 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
738 )
739 .with_internal_id_generator(SequentialIdGenerator::new(500));
740 ctx.insert_resource(PostgresDialect);
741 ctx.insert_resource(StubExecutor {
742 affected: 1,
743 rows: Vec::new(),
744 });
745
746 let repo = ctx
747 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
748 .unwrap();
749 let graph = GraphNode::new("Order").value("name", "root").relation(
750 "lines",
751 GraphNode::new("OrderLine")
752 .value("name", "line-1")
753 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
754 );
755
756 let saved = repo.save_graph(graph).unwrap();
757
758 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
759 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
760 let lines = saved.relations.get("lines").unwrap();
761 assert_eq!(lines.len(), 1);
762 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
763 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
764 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
765 let product = lines[0].relations.get("product").unwrap();
766 assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
767 }
768
769 #[test]
770 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
771 let mut ctx = UserContext::new()
772 .with_metadata(
773 InMemoryMetadataStore::new()
774 .with_entity(entity())
775 .with_entity(line_entity())
776 .with_entity(product_entity()),
777 )
778 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
779 .with_internal_id_generator(SequentialIdGenerator::new(700));
780 ctx.insert_resource(PostgresDialect);
781 ctx.insert_resource(StubExecutor {
782 affected: 1,
783 rows: Vec::new(),
784 });
785
786 let repo = ctx
787 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
788 .unwrap();
789 let order = TypedGraphOrder {
790 id: None,
791 version: 1,
792 name: "typed-root".to_owned(),
793 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
794 id: None,
795 order_id: None,
796 name: "typed-line".to_owned(),
797 product_id: None,
798 product: Some(TypedGraphProduct {
799 id: None,
800 name: "typed-product".to_owned(),
801 }),
802 }]),
803 };
804
805 let extracted = repo.graph_node_from_entity(order).unwrap();
806 assert_eq!(extracted.entity, "Order");
807 assert_eq!(
808 extracted.values.get("name"),
809 Some(&Value::Text("typed-root".to_owned()))
810 );
811 assert_eq!(extracted.values.get("id"), Some(&Value::Null));
812 assert_eq!(extracted.relations["lines"].len(), 1);
813 assert_eq!(
814 extracted.relations["lines"][0].values.get("name"),
815 Some(&Value::Text("typed-line".to_owned()))
816 );
817 assert_eq!(
818 extracted.relations["lines"][0].relations["product"].len(),
819 1
820 );
821
822 let saved = repo.save_graph(extracted).unwrap();
823 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
824 let lines = saved.relations.get("lines").unwrap();
825 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
826 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
827 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
828 assert_eq!(
829 lines[0].relations["product"][0].values.get("id"),
830 Some(&Value::U64(702))
831 );
832 }
833
834 #[test]
835 fn resolved_repository_saves_typed_entity_graph_directly() {
836 let mut ctx = UserContext::new()
837 .with_metadata(
838 InMemoryMetadataStore::new()
839 .with_entity(entity())
840 .with_entity(line_entity())
841 .with_entity(product_entity()),
842 )
843 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
844 .with_internal_id_generator(SequentialIdGenerator::new(800));
845 ctx.insert_resource(PostgresDialect);
846 ctx.insert_resource(StubExecutor {
847 affected: 1,
848 rows: Vec::new(),
849 });
850
851 let repo = ctx
852 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
853 .unwrap();
854 let saved = repo
855 .save_entity_graph(TypedGraphOrder {
856 id: None,
857 version: 1,
858 name: "typed-direct".to_owned(),
859 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
860 id: None,
861 order_id: None,
862 name: "typed-line".to_owned(),
863 product_id: None,
864 product: Some(TypedGraphProduct {
865 id: None,
866 name: "typed-product".to_owned(),
867 }),
868 }]),
869 })
870 .unwrap();
871
872 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
873 assert_eq!(
874 saved.relations["lines"][0].values.get("order_id"),
875 Some(&Value::U64(800))
876 );
877 assert_eq!(
878 saved.relations["lines"][0].values.get("product_id"),
879 Some(&Value::U64(802))
880 );
881 }
882
883 #[test]
884 fn custom_user_context_can_drive_insert_preparation() {
885 let mut ctx = UserContext::new()
886 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
887 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
888 .with_repository_behavior_registry(
889 InMemoryRepositoryBehaviorRegistry::new()
890 .with_behavior("Order", ContextAwareOrderBehavior),
891 )
892 .with_internal_id_generator(FixedIdGenerator(99));
893 ctx.insert_named_resource("tenant", String::from("acme"));
894 ctx.insert_named_resource("initial_version", 7_i64);
895 ctx.put_local("trace_id", "req-9");
896 ctx.insert_resource(PostgresDialect);
897 ctx.insert_resource(StubExecutor {
898 affected: 1,
899 rows: Vec::new(),
900 });
901
902 let repo = ctx
903 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
904 .unwrap();
905 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
906
907 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
908 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
909 assert_eq!(
910 prepared.values.get("name"),
911 Some(&Value::Text("acme:req-9".to_owned()))
912 );
913 }
914
915 #[test]
916 fn checker_registry_validates_and_fixes_insert_commands() {
917 let mut ctx = UserContext::new()
918 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
919 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
920 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
921 .with_internal_id_generator(FixedIdGenerator(77));
922 ctx.insert_resource(PostgresDialect);
923 ctx.insert_resource(StubExecutor {
924 affected: 1,
925 rows: Vec::new(),
926 });
927
928 let repo = ctx
929 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
930 .unwrap();
931 let prepared = repo
932 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
933 .unwrap();
934
935 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
936 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
937 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
938
939 let error = repo
940 .prepare_insert_command(&repo.insert_command().value("name", "no"))
941 .unwrap_err();
942 match error {
943 RuntimeError::Check(results) => {
944 assert_eq!(results.len(), 1);
945 assert_eq!(results[0].location.to_string(), "name");
946 }
947 other => panic!("unexpected checker error: {other:?}"),
948 }
949 }
950
951 #[test]
952 fn checker_registry_validates_update_commands_without_required_insert_checks() {
953 let mut ctx = UserContext::new()
954 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
955 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
956 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
957 ctx.insert_resource(PostgresDialect);
958 ctx.insert_resource(StubExecutor {
959 affected: 1,
960 rows: Vec::new(),
961 });
962
963 let repo = ctx
964 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
965 .unwrap();
966 repo.update(&repo.update_command(1_u64).value("version", 1_i64))
967 .unwrap();
968
969 let error = repo
970 .update(&repo.update_command(1_u64).value("name", "no"))
971 .unwrap_err();
972 match error {
973 RepositoryError::Runtime(RuntimeError::Check(results)) => {
974 assert_eq!(results.len(), 1);
975 assert_eq!(results[0].location.to_string(), "name");
976 }
977 other => panic!("unexpected checker error: {other:?}"),
978 }
979 }
980
981 #[test]
982 fn built_in_language_translators_cover_fifteen_languages() {
983 assert_eq!(Language::ALL.len(), 15);
984 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
985 let messages = Language::ALL
986 .iter()
987 .map(|language| translate_check_result(*language, &result))
988 .collect::<Vec<_>>();
989
990 assert!(messages.iter().all(|message| !message.is_empty()));
991 assert!(messages.iter().any(|message| message.contains("required")));
992 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
993 assert!(
994 messages
995 .iter()
996 .any(|message| message.contains("obligatoire"))
997 );
998 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
999 assert_eq!(
1000 Language::from_code("zh-TW"),
1001 Some(Language::TraditionalChinese)
1002 );
1003 }
1004
1005 #[test]
1006 fn user_context_language_switch_translates_checker_errors() {
1007 let mut ctx = UserContext::new()
1008 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1009 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1010 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1011 .with_internal_id_generator(FixedIdGenerator(77))
1012 .with_language(Language::Chinese);
1013 ctx.insert_resource(PostgresDialect);
1014 ctx.insert_resource(StubExecutor {
1015 affected: 1,
1016 rows: Vec::new(),
1017 });
1018
1019 let repo = ctx
1020 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1021 .unwrap();
1022 let error = repo
1023 .prepare_insert_command(&repo.insert_command())
1024 .unwrap_err();
1025 match error {
1026 RuntimeError::Check(results) => {
1027 assert_eq!(results.len(), 1);
1028 assert!(
1029 results[0]
1030 .message
1031 .as_ref()
1032 .is_some_and(|message| message.contains("å¿…å¡«"))
1033 );
1034 }
1035 other => panic!("unexpected checker error: {other:?}"),
1036 }
1037
1038 let mut ctx = UserContext::new().with_language(Language::English);
1039 ctx.set_language_code("es").unwrap();
1040 assert_eq!(ctx.language(), Language::Spanish);
1041 }
1042
1043 #[test]
1044 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1045 let mut ctx = UserContext::new()
1046 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1047 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1048 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1049 ctx.insert_resource(PostgresDialect);
1050 ctx.insert_resource(StubExecutor {
1051 affected: 1,
1052 rows: vec![Record::from([
1053 ("id".to_owned(), Value::U64(1)),
1054 ("version".to_owned(), Value::I64(1)),
1055 ("name".to_owned(), Value::Text("old".to_owned())),
1056 ])],
1057 });
1058
1059 let repo = ctx
1060 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1061 .unwrap();
1062 let saved = repo
1063 .save_graph(
1064 GraphNode::new("Order")
1065 .value("id", 1_u64)
1066 .value("version", 1_i64)
1067 .value("name", "graph-update"),
1068 )
1069 .unwrap();
1070
1071 assert_eq!(
1072 saved.values.get("name"),
1073 Some(&Value::Text("graph-update-checked".to_owned()))
1074 );
1075 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1076 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1077 }
1078
1079 #[test]
1080 fn user_context_event_sink_receives_repository_mutation_events() {
1081 let events = Arc::new(Mutex::new(Vec::new()));
1082 let mut ctx = UserContext::new()
1083 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1084 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1085 .with_internal_id_generator(FixedIdGenerator(88))
1086 .with_event_sink(RecordingEventSink {
1087 events: events.clone(),
1088 });
1089 ctx.insert_resource(PostgresDialect);
1090 ctx.insert_resource(StubExecutor {
1091 affected: 1,
1092 rows: Vec::new(),
1093 });
1094
1095 let repo = ctx
1096 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1097 .unwrap();
1098 repo.insert(&repo.insert_command().value("name", "created"))
1099 .unwrap();
1100 repo.update(
1101 &repo
1102 .update_command(88_u64)
1103 .expected_version(1)
1104 .value("name", "updated"),
1105 )
1106 .unwrap();
1107 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1108 .unwrap();
1109 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1110
1111 let events = events.lock().unwrap();
1112 assert_eq!(events.len(), 4);
1113 assert_eq!(events[0].kind, EntityEventKind::Created);
1114 assert_eq!(events[0].entity, "Order");
1115 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1116 assert_eq!(events[1].kind, EntityEventKind::Updated);
1117 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1118 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1119 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1120 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1121 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1122 }
1123
1124 #[test]
1125 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1126 let events = Arc::new(Mutex::new(Vec::new()));
1127 let mut ctx = UserContext::new()
1128 .with_metadata(
1129 InMemoryMetadataStore::new()
1130 .with_entity(entity())
1131 .with_entity(line_entity())
1132 .with_entity(product_entity()),
1133 )
1134 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1135 .with_event_sink(RecordingEventSink {
1136 events: events.clone(),
1137 });
1138 ctx.insert_resource(PostgresDialect);
1139 ctx.insert_resource(StubExecutor {
1140 affected: 1,
1141 rows: vec![Record::from([
1142 ("id".to_owned(), Value::U64(1)),
1143 ("version".to_owned(), Value::I64(1)),
1144 ("name".to_owned(), Value::Text("old".to_owned())),
1145 ])],
1146 });
1147
1148 let repo = ctx
1149 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1150 .unwrap();
1151 repo.save_graph(
1152 GraphNode::new("Order")
1153 .value("id", 1_u64)
1154 .value("version", 1_i64)
1155 .value("name", "updated")
1156 .relation(
1157 "lines",
1158 GraphNode::new("OrderLine")
1159 .value("name", "line")
1160 .value("product_id", 3_u64),
1161 ),
1162 )
1163 .unwrap();
1164
1165 let events = events.lock().unwrap();
1166 assert_eq!(events.len(), 3);
1167 assert_eq!(events[0].kind, EntityEventKind::Updated);
1168 assert_eq!(events[0].entity, "Order");
1169 assert_eq!(events[1].kind, EntityEventKind::Updated);
1170 assert_eq!(events[1].entity, "OrderLine");
1171 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1172 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1173 assert_eq!(events[2].entity, "OrderLine");
1174 }
1175
1176 #[test]
1177 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1178 let mut ctx = UserContext::new()
1179 .with_metadata(
1180 InMemoryMetadataStore::new()
1181 .with_entity(entity())
1182 .with_entity(line_entity())
1183 .with_entity(product_entity()),
1184 )
1185 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1186 .with_internal_id_generator(SequentialIdGenerator::new(500));
1187 ctx.insert_resource(PostgresDialect);
1188 ctx.insert_resource(StubExecutor {
1189 affected: 1,
1190 rows: vec![Record::from([
1191 ("id".to_owned(), Value::U64(1)),
1192 ("version".to_owned(), Value::I64(1)),
1193 ("name".to_owned(), Value::Text("old".to_owned())),
1194 ])],
1195 });
1196
1197 let plan = ctx
1198 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1199 GraphNode::new("Order")
1200 .value("id", 1_u64)
1201 .value("version", 1_i64)
1202 .value("name", "updated")
1203 .relation(
1204 "lines",
1205 GraphNode::new("OrderLine")
1206 .value("name", "new-line-a")
1207 .value("product_id", 2_u64),
1208 )
1209 .relation(
1210 "lines",
1211 GraphNode::new("OrderLine")
1212 .value("name", "new-line-b")
1213 .value("product_id", 2_u64),
1214 )
1215 .relation(
1216 "lines",
1217 GraphNode::new("OrderLine")
1218 .value("id", 5_u64)
1219 .value("version", 1_i64)
1220 .value("name", "same-update-a"),
1221 )
1222 .relation(
1223 "lines",
1224 GraphNode::new("OrderLine")
1225 .value("id", 6_u64)
1226 .value("version", 1_i64)
1227 .value("name", "same-update-b"),
1228 )
1229 .relation(
1230 "lines",
1231 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1232 )
1233 .relation(
1234 "lines",
1235 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1236 ),
1237 )
1238 .unwrap();
1239 let counts = plan.grouped_counts();
1240
1241 assert_eq!(
1242 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1243 Some(&1)
1244 );
1245 assert_eq!(
1246 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1247 Some(&2)
1248 );
1249 assert_eq!(
1250 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1251 Some(&2)
1252 );
1253 assert_eq!(
1254 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1255 Some(&1)
1256 );
1257 assert_eq!(
1258 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1259 Some(&1)
1260 );
1261 let create_batch = plan
1262 .batches
1263 .iter()
1264 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1265 .unwrap();
1266 assert_eq!(create_batch.items.len(), 2);
1267 assert_eq!(
1268 create_batch.items[0].values.get("id"),
1269 Some(&Value::U64(500))
1270 );
1271 assert_eq!(
1272 create_batch.items[1].values.get("id"),
1273 Some(&Value::U64(501))
1274 );
1275 let update_batch = plan
1276 .batches
1277 .iter()
1278 .find(|batch| {
1279 batch.entity == "OrderLine"
1280 && batch.kind == GraphMutationKind::Update
1281 && batch.update_fields == vec!["name".to_owned()]
1282 })
1283 .unwrap();
1284 assert_eq!(update_batch.items.len(), 2);
1285 }
1286
1287 #[test]
1288 fn resolved_repository_builds_relation_plans() {
1289 let mut ctx = UserContext::new()
1290 .with_metadata(
1291 InMemoryMetadataStore::new()
1292 .with_entity(entity())
1293 .with_entity(line_entity())
1294 .with_entity(product_entity()),
1295 )
1296 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297 .with_repository_behavior_registry(
1298 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1299 );
1300 ctx.insert_resource(PostgresDialect);
1301 ctx.insert_resource(StubExecutor {
1302 affected: 1,
1303 rows: Vec::new(),
1304 });
1305
1306 let repo = ctx
1307 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1308 .unwrap();
1309 let plans = repo.relation_plans().unwrap();
1310
1311 assert_eq!(plans.len(), 1);
1312 assert_eq!(plans[0].relation_name, "lines");
1313 assert_eq!(plans[0].target_entity, "OrderLine");
1314 assert_eq!(plans[0].local_key, "id");
1315 assert_eq!(plans[0].foreign_key, "order_id");
1316 assert!(plans[0].many);
1317 }
1318
1319 #[test]
1320 fn resolved_repository_builds_relation_query_from_parent_rows() {
1321 let mut ctx = UserContext::new()
1322 .with_metadata(
1323 InMemoryMetadataStore::new()
1324 .with_entity(entity())
1325 .with_entity(line_entity())
1326 .with_entity(product_entity()),
1327 )
1328 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1329 .with_repository_behavior_registry(
1330 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1331 );
1332 ctx.insert_resource(PostgresDialect);
1333 ctx.insert_resource(StubExecutor {
1334 affected: 1,
1335 rows: Vec::new(),
1336 });
1337
1338 let repo = ctx
1339 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1340 .unwrap();
1341 let parent_rows = vec![
1342 Record::from([(String::from("id"), Value::U64(11))]),
1343 Record::from([(String::from("id"), Value::U64(12))]),
1344 ];
1345
1346 let query = repo.relation_query("lines", &parent_rows).unwrap();
1347 let compiled = repo.compile(&query).unwrap();
1348 assert!(compiled.sql.contains("FROM \"orderline\""));
1349 assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1350 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1351 }
1352
1353 #[test]
1354 fn resolved_repository_enhances_parent_rows_with_relations() {
1355 let mut ctx = UserContext::new()
1356 .with_metadata(
1357 InMemoryMetadataStore::new()
1358 .with_entity(entity())
1359 .with_entity(line_entity())
1360 .with_entity(product_entity()),
1361 )
1362 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1363 .with_repository_behavior_registry(
1364 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1365 );
1366 ctx.insert_resource(PostgresDialect);
1367 ctx.insert_resource(StubExecutor {
1368 affected: 1,
1369 rows: vec![
1370 Record::from([
1371 (String::from("id"), Value::U64(101)),
1372 (String::from("order_id"), Value::U64(11)),
1373 (String::from("name"), Value::Text(String::from("l1"))),
1374 ]),
1375 Record::from([
1376 (String::from("id"), Value::U64(102)),
1377 (String::from("order_id"), Value::U64(11)),
1378 (String::from("name"), Value::Text(String::from("l2"))),
1379 ]),
1380 Record::from([
1381 (String::from("id"), Value::U64(201)),
1382 (String::from("order_id"), Value::U64(12)),
1383 (String::from("name"), Value::Text(String::from("l3"))),
1384 ]),
1385 ],
1386 });
1387
1388 let repo = ctx
1389 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1390 .unwrap();
1391 let mut parents = vec![
1392 Record::from([(String::from("id"), Value::U64(11))]),
1393 Record::from([(String::from("id"), Value::U64(12))]),
1394 ];
1395
1396 repo.enhance_relations(&mut parents).unwrap();
1397
1398 match parents[0].get("lines") {
1399 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1400 other => panic!("unexpected lines payload: {other:?}"),
1401 }
1402 match parents[1].get("lines") {
1403 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1404 other => panic!("unexpected lines payload: {other:?}"),
1405 }
1406 }
1407
1408 #[test]
1409 fn resolved_repository_fetches_smart_list_of_entities() {
1410 let mut ctx = UserContext::new()
1411 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1412 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1413 ctx.insert_resource(PostgresDialect);
1414 ctx.insert_resource(StubExecutor {
1415 affected: 1,
1416 rows: vec![Record::from([
1417 (String::from("id"), Value::U64(7)),
1418 (String::from("version"), Value::I64(2)),
1419 (String::from("name"), Value::Text(String::from("typed"))),
1420 ])],
1421 });
1422
1423 let repo = ctx
1424 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1425 .unwrap();
1426 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1427
1428 assert_eq!(rows.len(), 1);
1429 assert_eq!(
1430 rows.first(),
1431 Some(&OrderEntity {
1432 id: 7,
1433 version: 2,
1434 name: String::from("typed"),
1435 })
1436 );
1437 }
1438
1439 #[test]
1440 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1441 let mut ctx = UserContext::new()
1442 .with_metadata(
1443 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1444 )
1445 .with_repository_registry(
1446 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1447 );
1448 ctx.insert_resource(PostgresDialect);
1449 ctx.insert_resource(StubExecutor {
1450 affected: 1,
1451 rows: vec![Record::from([
1452 (String::from("id"), Value::U64(9)),
1453 (String::from("name"), Value::Text(String::from("derived"))),
1454 ])],
1455 });
1456
1457 let repo = ctx
1458 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1459 .unwrap();
1460 let rows = repo
1461 .fetch_entities::<CatalogProductRow>(&repo.select())
1462 .unwrap();
1463
1464 assert_eq!(rows.len(), 1);
1465 assert_eq!(
1466 rows.first(),
1467 Some(&CatalogProductRow {
1468 id: 9,
1469 name: String::from("derived"),
1470 })
1471 );
1472 }
1473
1474 #[test]
1475 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1476 let mut ctx = UserContext::new()
1477 .with_metadata(
1478 InMemoryMetadataStore::new()
1479 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1480 )
1481 .with_repository_registry(
1482 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1483 );
1484 ctx.insert_resource(PostgresDialect);
1485 ctx.insert_resource(StubExecutor {
1486 affected: 1,
1487 rows: vec![Record::from([
1488 (String::from("id"), Value::U64(1)),
1489 (String::from("lineCount"), Value::I64(3)),
1490 (String::from("amount"), Value::F64(18.5)),
1491 ])],
1492 });
1493
1494 let repo = ctx
1495 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1496 .unwrap();
1497 let rows = repo
1498 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1499 .unwrap();
1500
1501 assert_eq!(rows.len(), 1);
1502 assert_eq!(rows.data[0].id, 1);
1503 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1504 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1505 assert_eq!(
1506 rows.into_vec().into_iter().next().unwrap().into_json(),
1507 serde_json::json!({
1508 "id": 1,
1509 "lineCount": 3,
1510 "amount": 18.5
1511 })
1512 );
1513 }
1514
1515 #[test]
1516 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1517 let executor = QueueExecutor {
1518 affected: 1,
1519 rows: Mutex::new(VecDeque::from([
1520 vec![
1521 Record::from([
1522 (String::from("id"), Value::U64(1)),
1523 (String::from("version"), Value::I64(1)),
1524 (String::from("name"), Value::Text(String::from("first"))),
1525 ]),
1526 Record::from([
1527 (String::from("id"), Value::U64(2)),
1528 (String::from("version"), Value::I64(1)),
1529 (String::from("name"), Value::Text(String::from("second"))),
1530 ]),
1531 ],
1532 vec![Record::from([
1533 (String::from("order_id"), Value::U64(1)),
1534 (String::from("lineCount"), Value::I64(3)),
1535 ])],
1536 ])),
1537 queries: Mutex::new(Vec::new()),
1538 };
1539 let mut ctx = UserContext::new()
1540 .with_metadata(
1541 InMemoryMetadataStore::new()
1542 .with_entity(entity())
1543 .with_entity(line_entity()),
1544 )
1545 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1546 ctx.insert_resource(PostgresDialect);
1547 ctx.insert_resource(executor);
1548
1549 let repo = ctx
1550 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1551 .unwrap();
1552 let rows = repo
1553 .fetch_all_with_relation_aggregates(
1554 &repo
1555 .select()
1556 .project("id")
1557 .project("version")
1558 .project("name"),
1559 &[RelationAggregate::new(
1560 "lines",
1561 "lineCount",
1562 SelectQuery::new("OrderLine"),
1563 true,
1564 )],
1565 )
1566 .unwrap();
1567
1568 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1569 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1570
1571 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1572 let queries = executor.queries.lock().unwrap();
1573 assert_eq!(queries.len(), 2);
1574 assert_eq!(
1575 queries[1],
1576 "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1577 );
1578 }
1579
1580 #[test]
1581 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1582 let executor = QueueExecutor {
1583 affected: 1,
1584 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1585 String::from("count"),
1586 Value::I64(2),
1587 )])]])),
1588 queries: Mutex::new(Vec::new()),
1589 };
1590 let mut ctx = UserContext::new()
1591 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1592 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1593 ctx.insert_resource(PostgresDialect);
1594 ctx.insert_resource(executor);
1595 ctx.insert_resource(InMemoryAggregationCache::default());
1596
1597 let repo = ctx
1598 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1599 .unwrap();
1600 let query = repo
1601 .select()
1602 .count("count")
1603 .enable_aggregation_cache_for(60_000);
1604
1605 let first = repo.fetch_all(&query).unwrap();
1606 let second = repo.fetch_all(&query).unwrap();
1607
1608 assert_eq!(first, second);
1609 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1610 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1611 }
1612
1613 #[test]
1614 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1615 let executor = QueueExecutor {
1616 affected: 1,
1617 rows: Mutex::new(VecDeque::from([
1618 vec![Record::from([(String::from("count"), Value::I64(2))])],
1619 vec![Record::from([(String::from("count"), Value::I64(3))])],
1620 ])),
1621 queries: Mutex::new(Vec::new()),
1622 };
1623 let mut ctx = UserContext::new()
1624 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1625 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1626 ctx.insert_resource(PostgresDialect);
1627 ctx.insert_resource(executor);
1628 ctx.insert_resource(
1629 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1630 as Arc<dyn AggregationCacheBackend>,
1631 );
1632
1633 let repo = ctx
1634 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1635 .unwrap();
1636 let query = repo
1637 .select()
1638 .count("count")
1639 .enable_aggregation_cache_for(60_000);
1640
1641 let first = repo.fetch_all(&query).unwrap();
1642 let cached = repo.fetch_all(&query).unwrap();
1643 repo.insert(
1644 &InsertCommand::new("Order")
1645 .value("id", 9_u64)
1646 .value("version", 1_i64)
1647 .value("name", "new"),
1648 )
1649 .unwrap();
1650 let refreshed = repo.fetch_all(&query).unwrap();
1651
1652 assert_eq!(first, cached);
1653 assert_ne!(cached, refreshed);
1654 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1655 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1656 }
1657
1658 #[test]
1659 fn aggregation_cache_propagates_to_relation_aggregates() {
1660 let parent_rows = vec![
1661 Record::from([
1662 (String::from("id"), Value::U64(1)),
1663 (String::from("version"), Value::I64(1)),
1664 (String::from("name"), Value::Text(String::from("first"))),
1665 ]),
1666 Record::from([
1667 (String::from("id"), Value::U64(2)),
1668 (String::from("version"), Value::I64(1)),
1669 (String::from("name"), Value::Text(String::from("second"))),
1670 ]),
1671 ];
1672 let aggregate_rows = vec![Record::from([
1673 (String::from("order_id"), Value::U64(1)),
1674 (String::from("lineCount"), Value::I64(3)),
1675 ])];
1676 let executor = QueueExecutor {
1677 affected: 1,
1678 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1679 queries: Mutex::new(Vec::new()),
1680 };
1681 let mut ctx = UserContext::new()
1682 .with_metadata(
1683 InMemoryMetadataStore::new()
1684 .with_entity(entity())
1685 .with_entity(line_entity()),
1686 )
1687 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1688 ctx.insert_resource(PostgresDialect);
1689 ctx.insert_resource(executor);
1690 ctx.insert_resource(InMemoryAggregationCache::default());
1691
1692 let repo = ctx
1693 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1694 .unwrap();
1695 let query = repo
1696 .select()
1697 .project("id")
1698 .project("version")
1699 .project("name")
1700 .enable_aggregation_cache_for(60_000)
1701 .propagate_aggregation_cache(60_000);
1702 let aggregate =
1703 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1704
1705 let first = repo
1706 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1707 .unwrap();
1708 let second = repo
1709 .fetch_all_with_relation_aggregates(&query, &[aggregate])
1710 .unwrap();
1711
1712 assert_eq!(first, second);
1713 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1714 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1715 }
1716
1717 #[test]
1718 fn memory_repository_fetches_smart_list_entities_with_query_features() {
1719 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1720 let repository = MemoryRepository::new(metadata).with_rows(
1721 "Order",
1722 vec![
1723 Record::from([
1724 (String::from("id"), Value::U64(1)),
1725 (String::from("version"), Value::I64(1)),
1726 (String::from("name"), Value::Text(String::from("alpha"))),
1727 ]),
1728 Record::from([
1729 (String::from("id"), Value::U64(2)),
1730 (String::from("version"), Value::I64(1)),
1731 (String::from("name"), Value::Text(String::from("beta"))),
1732 ]),
1733 Record::from([
1734 (String::from("id"), Value::U64(3)),
1735 (String::from("version"), Value::I64(1)),
1736 (String::from("name"), Value::Text(String::from("gamma"))),
1737 ]),
1738 ],
1739 );
1740
1741 let query = teaql_core::SelectQuery::new("Order")
1742 .filter(Expr::Binary {
1743 left: Box::new(Expr::column("id")),
1744 op: teaql_core::BinaryOp::Gte,
1745 right: Box::new(Expr::value(2_u64)),
1746 })
1747 .order_by(OrderBy::desc("id"))
1748 .limit(1);
1749
1750 let orders = repository.fetch_entities::<Order>(&query).unwrap();
1751
1752 assert_eq!(orders.ids(), vec![Value::U64(3)]);
1753 assert_eq!(orders.versions(), vec![1]);
1754 assert_eq!(orders.first().unwrap().name, "gamma");
1755 }
1756
1757 #[test]
1758 fn memory_repository_runs_aggregates() {
1759 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1760 let repository = MemoryRepository::new(metadata).with_rows(
1761 "Order",
1762 vec![
1763 Record::from([
1764 (String::from("id"), Value::U64(1)),
1765 (String::from("version"), Value::I64(1)),
1766 (String::from("name"), Value::Text(String::from("alpha"))),
1767 ]),
1768 Record::from([
1769 (String::from("id"), Value::U64(2)),
1770 (String::from("version"), Value::I64(2)),
1771 (String::from("name"), Value::Text(String::from("beta"))),
1772 ]),
1773 ],
1774 );
1775
1776 let query = teaql_core::SelectQuery {
1777 entity: String::from("Order"),
1778 projection: Vec::new(),
1779 expr_projection: Vec::new(),
1780 filter: None,
1781 having: None,
1782 order_by: Vec::new(),
1783 slice: None,
1784 aggregates: vec![
1785 Aggregate {
1786 function: AggregateFunction::Count,
1787 field: String::from("id"),
1788 alias: String::from("count"),
1789 },
1790 Aggregate {
1791 function: AggregateFunction::Sum,
1792 field: String::from("version"),
1793 alias: String::from("versionSum"),
1794 },
1795 ],
1796 group_by: Vec::new(),
1797 relations: Vec::new(),
1798 aggregation_cache: None,
1799 comment: None,
1800 raw_sql: None,
1801 raw_sql_search_criteria: Vec::new(),
1802 json_expr: None,
1803 dynamic_properties: Vec::new(),
1804 raw_projections: Vec::new(),
1805 object_group_bys: Vec::new(),
1806 child_enhancements: Vec::new(),
1807 };
1808
1809 let rows = repository.fetch_all(&query).unwrap();
1810
1811 assert_eq!(rows.len(), 1);
1812 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1813 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1814 }
1815
1816 #[test]
1817 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1818 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1819 let repository = MemoryRepository::new(metadata).with_rows(
1820 "Order",
1821 vec![
1822 Record::from([
1823 (String::from("id"), Value::U64(1)),
1824 (String::from("version"), Value::I64(1)),
1825 (String::from("name"), Value::Text(String::from("alpha"))),
1826 ]),
1827 Record::from([
1828 (String::from("id"), Value::U64(2)),
1829 (String::from("version"), Value::I64(2)),
1830 (String::from("name"), Value::Text(String::from("alpha"))),
1831 ]),
1832 Record::from([
1833 (String::from("id"), Value::U64(3)),
1834 (String::from("version"), Value::I64(3)),
1835 (String::from("name"), Value::Text(String::from("tmp-beta"))),
1836 ]),
1837 ],
1838 );
1839
1840 let rows = repository
1841 .fetch_all(
1842 &teaql_core::SelectQuery::new("Order")
1843 .filter(
1844 Expr::between("version", 1_i64, 3_i64)
1845 .and_expr(Expr::not_like("name", "tmp%"))
1846 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1847 )
1848 .group_by("name")
1849 .count("total")
1850 .sum("version", "versionSum"),
1851 )
1852 .unwrap();
1853
1854 assert_eq!(rows.len(), 1);
1855 assert_eq!(
1856 rows[0].get("name"),
1857 Some(&Value::Text(String::from("alpha")))
1858 );
1859 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1860 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1861 }
1862
1863 #[test]
1864 fn memory_repository_runs_extended_aggregates_and_having() {
1865 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1866 let repository = MemoryRepository::new(metadata).with_rows(
1867 "Order",
1868 vec![
1869 Record::from([
1870 (String::from("id"), Value::U64(1)),
1871 (String::from("version"), Value::I64(1)),
1872 (String::from("name"), Value::Text(String::from("alpha"))),
1873 ]),
1874 Record::from([
1875 (String::from("id"), Value::U64(2)),
1876 (String::from("version"), Value::I64(3)),
1877 (String::from("name"), Value::Text(String::from("alpha"))),
1878 ]),
1879 Record::from([
1880 (String::from("id"), Value::U64(3)),
1881 (String::from("version"), Value::I64(7)),
1882 (String::from("name"), Value::Text(String::from("beta"))),
1883 ]),
1884 ],
1885 );
1886
1887 let rows = repository
1888 .fetch_all(
1889 &teaql_core::SelectQuery::new("Order")
1890 .group_by("name")
1891 .count("total")
1892 .stddev("version", "stddevVersion")
1893 .var_pop("version", "varPopVersion")
1894 .bit_or("version", "bitOrVersion")
1895 .having(Expr::gt("total", 1_i64)),
1896 )
1897 .unwrap();
1898
1899 assert_eq!(rows.len(), 1);
1900 assert_eq!(
1901 rows[0].get("name"),
1902 Some(&Value::Text(String::from("alpha")))
1903 );
1904 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1905 assert_eq!(
1906 rows[0].get("stddevVersion").map(Value::to_json_value),
1907 Some(serde_json::Value::String(
1908 "1.4142135623730951454746218583".to_owned()
1909 ))
1910 );
1911 assert_eq!(
1912 rows[0].get("varPopVersion"),
1913 Some(&Value::Decimal(Decimal::ONE))
1914 );
1915 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1916 }
1917
1918 #[test]
1919 fn memory_repository_runs_sound_like_filter() {
1920 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1921 let repository = MemoryRepository::new(metadata).with_rows(
1922 "Order",
1923 vec![
1924 Record::from([
1925 (String::from("id"), Value::U64(1)),
1926 (String::from("version"), Value::I64(1)),
1927 (String::from("name"), Value::Text(String::from("Robert"))),
1928 ]),
1929 Record::from([
1930 (String::from("id"), Value::U64(2)),
1931 (String::from("version"), Value::I64(1)),
1932 (String::from("name"), Value::Text(String::from("Rupert"))),
1933 ]),
1934 Record::from([
1935 (String::from("id"), Value::U64(3)),
1936 (String::from("version"), Value::I64(1)),
1937 (String::from("name"), Value::Text(String::from("Ashcraft"))),
1938 ]),
1939 ],
1940 );
1941
1942 let rows = repository
1943 .fetch_all(
1944 &teaql_core::SelectQuery::new("Order")
1945 .filter(Expr::sound_like("name", "Robert"))
1946 .order_asc("id"),
1947 )
1948 .unwrap();
1949
1950 assert_eq!(rows.len(), 2);
1951 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1952 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1953 }
1954
1955 #[test]
1956 fn memory_repository_runs_java_style_string_match_filters() {
1957 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1958 let repository = MemoryRepository::new(metadata).with_rows(
1959 "Order",
1960 vec![
1961 Record::from([
1962 (String::from("id"), Value::U64(1)),
1963 (String::from("version"), Value::I64(1)),
1964 (String::from("name"), Value::Text(String::from("tea-order"))),
1965 ]),
1966 Record::from([
1967 (String::from("id"), Value::U64(2)),
1968 (String::from("version"), Value::I64(1)),
1969 (
1970 String::from("name"),
1971 Value::Text(String::from("coffee-order")),
1972 ),
1973 ]),
1974 Record::from([
1975 (String::from("id"), Value::U64(3)),
1976 (String::from("version"), Value::I64(1)),
1977 (
1978 String::from("name"),
1979 Value::Text(String::from("tea-archived")),
1980 ),
1981 ]),
1982 ],
1983 );
1984
1985 let rows = repository
1986 .fetch_all(
1987 &teaql_core::SelectQuery::new("Order")
1988 .filter(
1989 Expr::contain("name", "tea")
1990 .and_expr(Expr::begin_with("name", "tea"))
1991 .and_expr(Expr::end_with("name", "order"))
1992 .and_expr(Expr::not_contain("name", "coffee"))
1993 .and_expr(Expr::not_begin_with("name", "archived"))
1994 .and_expr(Expr::not_end_with("name", "draft")),
1995 )
1996 .order_asc("id"),
1997 )
1998 .unwrap();
1999
2000 assert_eq!(rows.len(), 1);
2001 assert_eq!(
2002 rows[0].get("name"),
2003 Some(&Value::Text("tea-order".to_owned()))
2004 );
2005 }
2006
2007 #[test]
2008 fn memory_repository_runs_property_to_property_filters() {
2009 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2010 let repository = MemoryRepository::new(metadata).with_rows(
2011 "Order",
2012 vec![
2013 Record::from([
2014 (String::from("id"), Value::U64(1)),
2015 (String::from("version"), Value::I64(2)),
2016 (String::from("name"), Value::Text(String::from("keep"))),
2017 ]),
2018 Record::from([
2019 (String::from("id"), Value::U64(2)),
2020 (String::from("version"), Value::I64(1)),
2021 (String::from("name"), Value::Text(String::from("skip"))),
2022 ]),
2023 ],
2024 );
2025
2026 let rows = repository
2027 .fetch_all(
2028 &teaql_core::SelectQuery::new("Order")
2029 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2030 .order_asc("id"),
2031 )
2032 .unwrap();
2033
2034 assert_eq!(rows.len(), 1);
2035 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2036 }
2037
2038 #[test]
2039 fn memory_repository_supports_mutations_and_optimistic_locking() {
2040 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2041 let repository = MemoryRepository::new(metadata);
2042
2043 repository
2044 .insert(
2045 &InsertCommand::new("Order")
2046 .value("id", 10_u64)
2047 .value("version", 1_i64)
2048 .value("name", "draft"),
2049 )
2050 .unwrap();
2051 repository
2052 .update(
2053 &UpdateCommand::new("Order", 10_u64)
2054 .expected_version(1)
2055 .value("name", "submitted"),
2056 )
2057 .unwrap();
2058
2059 let row = repository
2060 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2061 .unwrap()
2062 .pop()
2063 .unwrap();
2064 assert_eq!(
2065 row.get("name"),
2066 Some(&Value::Text(String::from("submitted")))
2067 );
2068 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2069
2070 let conflict = repository
2071 .update(
2072 &UpdateCommand::new("Order", 10_u64)
2073 .expected_version(1)
2074 .value("name", "stale"),
2075 )
2076 .unwrap_err();
2077 assert!(matches!(
2078 conflict,
2079 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2080 ));
2081
2082 repository
2083 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2084 .unwrap();
2085 let row = repository
2086 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2087 .unwrap()
2088 .pop()
2089 .unwrap();
2090 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2091
2092 repository
2093 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2094 .unwrap();
2095 let row = repository
2096 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2097 .unwrap()
2098 .pop()
2099 .unwrap();
2100 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2101 }
2102}
2103
2104#[cfg(all(test, feature = "sqlx"))]
2105mod sqlx_integration_tests {
2106 use super::sqlx_support::{
2107 MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2108 SqliteIdSpaceGenerator, SqliteMutationExecutor,
2109 };
2110 use super::{
2111 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2112 InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2113 RepositoryBehavior, UserContext,
2114 };
2115 use chrono::{NaiveDate, TimeZone, Utc};
2116 use teaql_core::{
2117 DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2118 PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2119 };
2120 use teaql_dialect_pg::PostgresDialect;
2121 use teaql_dialect_sqlite::SqliteDialect;
2122 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2123 use teaql_sql::SqlDialect;
2124 use tokio::runtime::Handle;
2125 use tokio::task::block_in_place;
2126
2127 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2128
2129 fn entity() -> EntityDescriptor {
2130 EntityDescriptor::new("Order")
2131 .table_name("orders")
2132 .property(
2133 PropertyDescriptor::new("id", DataType::U64)
2134 .column_name("id")
2135 .id()
2136 .not_null(),
2137 )
2138 .property(
2139 PropertyDescriptor::new("version", DataType::I64)
2140 .column_name("version")
2141 .version()
2142 .not_null(),
2143 )
2144 .property(
2145 PropertyDescriptor::new("name", DataType::Text)
2146 .column_name("name")
2147 .not_null(),
2148 )
2149 .relation(
2150 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2151 .local_key("id")
2152 .foreign_key("order_id")
2153 .many(),
2154 )
2155 }
2156
2157 fn sqlite_entity_keep_missing() -> EntityDescriptor {
2158 EntityDescriptor::new("Order")
2159 .table_name("orders")
2160 .property(
2161 PropertyDescriptor::new("id", DataType::U64)
2162 .column_name("id")
2163 .id()
2164 .not_null(),
2165 )
2166 .property(
2167 PropertyDescriptor::new("version", DataType::I64)
2168 .column_name("version")
2169 .version()
2170 .not_null(),
2171 )
2172 .property(
2173 PropertyDescriptor::new("name", DataType::Text)
2174 .column_name("name")
2175 .not_null(),
2176 )
2177 .relation(
2178 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2179 .local_key("id")
2180 .foreign_key("order_id")
2181 .many()
2182 .keep_missing(),
2183 )
2184 }
2185
2186 fn line_entity() -> EntityDescriptor {
2187 EntityDescriptor::new("OrderLine")
2188 .table_name("orderline")
2189 .property(
2190 PropertyDescriptor::new("id", DataType::U64)
2191 .column_name("id")
2192 .id()
2193 .not_null(),
2194 )
2195 .property(
2196 PropertyDescriptor::new("version", DataType::I64)
2197 .column_name("version")
2198 .version(),
2199 )
2200 .property(
2201 PropertyDescriptor::new("order_id", DataType::U64)
2202 .column_name("order_id")
2203 .not_null(),
2204 )
2205 .property(
2206 PropertyDescriptor::new("name", DataType::Text)
2207 .column_name("name")
2208 .not_null(),
2209 )
2210 .property(
2211 PropertyDescriptor::new("product_id", DataType::U64)
2212 .column_name("product_id")
2213 .not_null(),
2214 )
2215 .relation(
2216 teaql_core::RelationDescriptor::new("product", "Product")
2217 .local_key("product_id")
2218 .foreign_key("id"),
2219 )
2220 }
2221
2222 fn product_entity() -> EntityDescriptor {
2223 EntityDescriptor::new("Product")
2224 .table_name("product")
2225 .property(
2226 PropertyDescriptor::new("id", DataType::U64)
2227 .column_name("id")
2228 .id()
2229 .not_null(),
2230 )
2231 .property(
2232 PropertyDescriptor::new("name", DataType::Text)
2233 .column_name("name")
2234 .not_null(),
2235 )
2236 }
2237
2238 fn typed_entity() -> EntityDescriptor {
2239 EntityDescriptor::new("TypedValue")
2240 .table_name("typed_value")
2241 .property(
2242 PropertyDescriptor::new("id", DataType::U64)
2243 .column_name("id")
2244 .id()
2245 .not_null(),
2246 )
2247 .property(
2248 PropertyDescriptor::new("payload", DataType::Json)
2249 .column_name("payload")
2250 .not_null(),
2251 )
2252 .property(
2253 PropertyDescriptor::new("amount", DataType::Decimal)
2254 .column_name("amount")
2255 .not_null(),
2256 )
2257 .property(
2258 PropertyDescriptor::new("birthday", DataType::Date)
2259 .column_name("birthday")
2260 .not_null(),
2261 )
2262 .property(
2263 PropertyDescriptor::new("happened_at", DataType::Timestamp)
2264 .column_name("happened_at")
2265 .not_null(),
2266 )
2267 }
2268
2269 struct OrderBehavior;
2270 struct NestedOrderBehavior;
2271
2272 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2273 #[teaql(entity = "Product", table = "product")]
2274 struct ProductEntityRow {
2275 #[teaql(id)]
2276 id: u64,
2277 name: String,
2278 }
2279
2280 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2281 #[teaql(entity = "OrderLine", table = "orderline")]
2282 struct OrderLineEntityRow {
2283 #[teaql(id)]
2284 id: u64,
2285 #[teaql(column = "order_id")]
2286 order_id: u64,
2287 name: String,
2288 #[teaql(column = "product_id")]
2289 product_id: u64,
2290 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2291 product: Option<ProductEntityRow>,
2292 }
2293
2294 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2295 #[teaql(entity = "Order", table = "orders")]
2296 struct OrderAggregateRow {
2297 #[teaql(id)]
2298 id: u64,
2299 #[teaql(version)]
2300 version: i64,
2301 name: String,
2302 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2303 lines: teaql_core::SmartList<OrderLineEntityRow>,
2304 }
2305
2306 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2307 #[teaql(entity = "Order", table = "orders")]
2308 struct Order {
2309 #[teaql(id)]
2310 id: u64,
2311 #[teaql(version)]
2312 version: i64,
2313 name: String,
2314 }
2315
2316 impl RepositoryBehavior for OrderBehavior {
2317 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2318 vec!["lines".to_owned()]
2319 }
2320 }
2321
2322 impl RepositoryBehavior for NestedOrderBehavior {
2323 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2324 vec!["lines.product".to_owned()]
2325 }
2326 }
2327
2328 #[derive(Clone)]
2329 struct SqliteSyncExecutor {
2330 inner: SqliteMutationExecutor,
2331 }
2332
2333 impl SqliteSyncExecutor {
2334 fn new(inner: SqliteMutationExecutor) -> Self {
2335 Self { inner }
2336 }
2337 }
2338
2339 impl QueryExecutor for SqliteSyncExecutor {
2340 type Error = MutationExecutorError;
2341
2342 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2343 let handle = Handle::current();
2344 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2345 }
2346
2347 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2348 let handle = Handle::current();
2349 block_in_place(|| handle.block_on(self.inner.execute(query)))
2350 }
2351
2352 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2353 let handle = Handle::current();
2354 block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2355 Ok(GraphTransactionBoundary::Started)
2356 }
2357
2358 fn commit_transaction(&self) -> Result<(), Self::Error> {
2359 let handle = Handle::current();
2360 block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2361 }
2362
2363 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2364 let handle = Handle::current();
2365 block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2366 }
2367 }
2368
2369 #[derive(Clone)]
2370 struct PgSyncExecutor {
2371 inner: PgMutationExecutor,
2372 }
2373
2374 impl PgSyncExecutor {
2375 fn new(inner: PgMutationExecutor) -> Self {
2376 Self { inner }
2377 }
2378 }
2379
2380 impl QueryExecutor for PgSyncExecutor {
2381 type Error = MutationExecutorError;
2382
2383 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2384 let handle = Handle::current();
2385 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2386 }
2387
2388 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2389 let handle = Handle::current();
2390 block_in_place(|| handle.block_on(self.inner.execute(query)))
2391 }
2392 }
2393
2394 #[derive(Clone)]
2395 struct PgTxSyncExecutor {
2396 inner: PgTransactionExecutor,
2397 }
2398
2399 impl PgTxSyncExecutor {
2400 fn new(inner: PgTransactionExecutor) -> Self {
2401 Self { inner }
2402 }
2403 }
2404
2405 impl QueryExecutor for PgTxSyncExecutor {
2406 type Error = MutationExecutorError;
2407
2408 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2409 let handle = Handle::current();
2410 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2411 }
2412
2413 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2414 let handle = Handle::current();
2415 block_in_place(|| handle.block_on(self.inner.execute(query)))
2416 }
2417
2418 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2419 Ok(GraphTransactionBoundary::AlreadyActive)
2420 }
2421
2422 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2423 let handle = Handle::current();
2424 block_in_place(|| handle.block_on(self.inner.rollback()))
2425 }
2426 }
2427
2428 #[tokio::test]
2429 async fn sqlite_executor_runs_crud_flow() {
2430 use sqlx::sqlite::SqlitePoolOptions;
2431
2432 let pool = SqlitePoolOptions::new()
2433 .max_connections(1)
2434 .connect("sqlite::memory:")
2435 .await
2436 .unwrap();
2437
2438 let executor = SqliteMutationExecutor::new(pool.clone());
2439 let dialect = SqliteDialect;
2440 let entity = entity();
2441 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2442
2443 let insert = dialect
2444 .compile_insert(
2445 &entity,
2446 &InsertCommand::new("Order")
2447 .value("id", 1_u64)
2448 .value("version", 1_i64)
2449 .value("name", "first"),
2450 )
2451 .unwrap();
2452 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2453
2454 let select = dialect
2455 .compile_select(
2456 &entity,
2457 &SelectQuery::new("Order")
2458 .project("id")
2459 .project("version")
2460 .project("name")
2461 .filter(Expr::eq("id", 1_u64)),
2462 )
2463 .unwrap();
2464 let rows = executor.fetch_all(&select).await.unwrap();
2465 assert_eq!(rows.len(), 1);
2466 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2467 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2468 assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2469
2470 let update = dialect
2471 .compile_update(
2472 &entity,
2473 &UpdateCommand::new("Order", 1_u64)
2474 .expected_version(1)
2475 .value("name", "second"),
2476 )
2477 .unwrap();
2478 assert_eq!(executor.execute(&update).await.unwrap(), 1);
2479
2480 let after_update = executor.fetch_all(&select).await.unwrap();
2481 assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2482 assert_eq!(
2483 after_update[0].get("name"),
2484 Some(&Value::Text("second".to_owned()))
2485 );
2486
2487 let delete = dialect
2488 .compile_delete(
2489 &entity,
2490 &DeleteCommand::new("Order", 1_u64).expected_version(2),
2491 )
2492 .unwrap();
2493 assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2494
2495 let after_delete = executor.fetch_all(&select).await.unwrap();
2496 assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2497
2498 let recover = dialect
2499 .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2500 .unwrap();
2501 assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2502
2503 let after_recover = executor.fetch_all(&select).await.unwrap();
2504 assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2505 }
2506
2507 #[tokio::test(flavor = "multi_thread")]
2508 async fn sqlite_executor_enhances_relations() {
2509 use sqlx::sqlite::SqlitePoolOptions;
2510
2511 let pool = SqlitePoolOptions::new()
2512 .max_connections(1)
2513 .connect("sqlite::memory:")
2514 .await
2515 .unwrap();
2516
2517 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2518 let order = entity();
2519 let line = line_entity();
2520 mutation_executor
2521 .ensure_schema(&SqliteDialect, &[&order, &line])
2522 .await
2523 .unwrap();
2524
2525 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2526 .execute(&pool)
2527 .await
2528 .unwrap();
2529 sqlx::query(
2530 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2531 (101, 1, 1001, 'l1'),
2532 (102, 1, 1002, 'l2'),
2533 (201, 2, 1003, 'l3')",
2534 )
2535 .execute(&pool)
2536 .await
2537 .unwrap();
2538
2539 let executor = SqliteSyncExecutor::new(mutation_executor);
2540 let mut ctx = UserContext::new()
2541 .with_metadata(
2542 InMemoryMetadataStore::new()
2543 .with_entity(order)
2544 .with_entity(line)
2545 .with_entity(product_entity()),
2546 )
2547 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2548 .with_repository_behavior_registry(
2549 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2550 );
2551 ctx.insert_resource(SqliteDialect);
2552 ctx.insert_resource(executor);
2553
2554 let repo = ctx
2555 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2556 .unwrap();
2557 let mut parents = repo
2558 .fetch_all(
2559 &repo
2560 .select()
2561 .project("id")
2562 .project("version")
2563 .project("name")
2564 .order_by(teaql_core::OrderBy::asc("id")),
2565 )
2566 .unwrap();
2567
2568 repo.enhance_relations(&mut parents).unwrap();
2569
2570 assert_eq!(parents.len(), 2);
2571 match parents[0].get("lines") {
2572 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2573 other => panic!("unexpected first lines payload: {other:?}"),
2574 }
2575 match parents[1].get("lines") {
2576 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2577 other => panic!("unexpected second lines payload: {other:?}"),
2578 }
2579 }
2580
2581 #[tokio::test(flavor = "multi_thread")]
2582 async fn sqlite_executor_enhances_nested_relations() {
2583 use sqlx::sqlite::SqlitePoolOptions;
2584
2585 let pool = SqlitePoolOptions::new()
2586 .max_connections(1)
2587 .connect("sqlite::memory:")
2588 .await
2589 .unwrap();
2590
2591 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2592 let order = entity();
2593 let line = line_entity();
2594 let product = product_entity();
2595 mutation_executor
2596 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2597 .await
2598 .unwrap();
2599
2600 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2601 .execute(&pool)
2602 .await
2603 .unwrap();
2604 sqlx::query(
2605 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2606 (101, 1, 1001, 'l1'),
2607 (102, 1, 1002, 'l2')",
2608 )
2609 .execute(&pool)
2610 .await
2611 .unwrap();
2612 sqlx::query(
2613 "INSERT INTO product (id, name) VALUES
2614 (1001, 'p1'),
2615 (1002, 'p2')",
2616 )
2617 .execute(&pool)
2618 .await
2619 .unwrap();
2620
2621 let executor = SqliteSyncExecutor::new(mutation_executor);
2622 let mut ctx = UserContext::new()
2623 .with_metadata(
2624 InMemoryMetadataStore::new()
2625 .with_entity(order)
2626 .with_entity(line)
2627 .with_entity(product),
2628 )
2629 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2630 .with_repository_behavior_registry(
2631 InMemoryRepositoryBehaviorRegistry::new()
2632 .with_behavior("Order", NestedOrderBehavior),
2633 );
2634 ctx.insert_resource(SqliteDialect);
2635 ctx.insert_resource(executor);
2636
2637 let repo = ctx
2638 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2639 .unwrap();
2640 let mut parents = repo
2641 .fetch_all(
2642 &repo
2643 .select()
2644 .project("id")
2645 .project("version")
2646 .project("name"),
2647 )
2648 .unwrap();
2649
2650 repo.enhance_relations(&mut parents).unwrap();
2651
2652 match parents[0].get("lines") {
2653 Some(Value::List(lines)) => {
2654 assert_eq!(lines.len(), 2);
2655 for line in lines {
2656 match line {
2657 Value::Object(line_record) => match line_record.get("product") {
2658 Some(Value::Object(product)) => {
2659 assert!(product.get("name").is_some());
2660 }
2661 other => panic!("unexpected product payload: {other:?}"),
2662 },
2663 other => panic!("unexpected line payload: {other:?}"),
2664 }
2665 }
2666 }
2667 other => panic!("unexpected nested lines payload: {other:?}"),
2668 }
2669 }
2670
2671 #[tokio::test(flavor = "multi_thread")]
2672 async fn sqlite_executor_enhances_query_relation_with_child_query() {
2673 use sqlx::sqlite::SqlitePoolOptions;
2674
2675 let pool = SqlitePoolOptions::new()
2676 .max_connections(1)
2677 .connect("sqlite::memory:")
2678 .await
2679 .unwrap();
2680
2681 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2682 let order = entity();
2683 let line = line_entity();
2684 let product = product_entity();
2685 mutation_executor
2686 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2687 .await
2688 .unwrap();
2689
2690 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2691 .execute(&pool)
2692 .await
2693 .unwrap();
2694 sqlx::query(
2695 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2696 (101, 1, 1001, 'keep'),
2697 (102, 1, 1002, 'drop')",
2698 )
2699 .execute(&pool)
2700 .await
2701 .unwrap();
2702 sqlx::query(
2703 "INSERT INTO product (id, name) VALUES
2704 (1001, 'p1'),
2705 (1002, 'p2')",
2706 )
2707 .execute(&pool)
2708 .await
2709 .unwrap();
2710
2711 let executor = SqliteSyncExecutor::new(mutation_executor);
2712 let mut ctx = UserContext::new()
2713 .with_metadata(
2714 InMemoryMetadataStore::new()
2715 .with_entity(order)
2716 .with_entity(line)
2717 .with_entity(product),
2718 )
2719 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2720 ctx.insert_resource(SqliteDialect);
2721 ctx.insert_resource(executor);
2722
2723 let repo = ctx
2724 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2725 .unwrap();
2726 let query = repo.select().relation_query(
2727 "lines",
2728 SelectQuery::new("OrderLine")
2729 .project("name")
2730 .filter(Expr::eq("name", "keep"))
2731 .order_desc("id")
2732 .page(0, 10)
2733 .relation_query("product", SelectQuery::new("Product").project("name")),
2734 );
2735 let mut parents = repo
2736 .fetch_all(
2737 &repo
2738 .select()
2739 .project("id")
2740 .project("version")
2741 .project("name"),
2742 )
2743 .unwrap();
2744
2745 repo.enhance_query_relations(&mut parents, &query).unwrap();
2746
2747 match parents[0].get("lines") {
2748 Some(Value::List(lines)) => {
2749 assert_eq!(lines.len(), 1);
2750 let Value::Object(line) = &lines[0] else {
2751 panic!("unexpected line payload: {:?}", lines[0]);
2752 };
2753 assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2754 assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2755 assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2756 match line.get("product") {
2757 Some(Value::Object(product)) => {
2758 assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2759 assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2760 }
2761 other => panic!("unexpected product payload: {other:?}"),
2762 }
2763 }
2764 other => panic!("unexpected query relation payload: {other:?}"),
2765 }
2766 }
2767
2768 #[tokio::test]
2769 async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2770 use sqlx::Row;
2771 use sqlx::sqlite::SqlitePoolOptions;
2772
2773 let pool = SqlitePoolOptions::new()
2774 .max_connections(1)
2775 .connect("sqlite::memory:")
2776 .await
2777 .unwrap();
2778
2779 sqlx::query(
2780 "CREATE TABLE orders (
2781 id INTEGER PRIMARY KEY
2782 )",
2783 )
2784 .execute(&pool)
2785 .await
2786 .unwrap();
2787
2788 let executor = SqliteMutationExecutor::new(pool.clone());
2789 let dialect = SqliteDialect;
2790 let entity = entity();
2791
2792 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2793
2794 let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2795 .fetch_all(&pool)
2796 .await
2797 .unwrap()
2798 .into_iter()
2799 .map(|row| row.try_get::<String, _>("name").unwrap())
2800 .collect::<Vec<_>>();
2801
2802 assert!(columns.contains(&"id".to_owned()));
2803 assert!(columns.contains(&"version".to_owned()));
2804 assert!(columns.contains(&"name".to_owned()));
2805 }
2806
2807 #[tokio::test]
2808 async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2809 use sqlx::Row;
2810 use sqlx::sqlite::SqlitePoolOptions;
2811
2812 let pool = SqlitePoolOptions::new()
2813 .max_connections(1)
2814 .connect("sqlite::memory:")
2815 .await
2816 .unwrap();
2817
2818 let module = super::RuntimeModule::new()
2819 .descriptor(entity())
2820 .descriptor(line_entity())
2821 .descriptor(product_entity())
2822 .initial_graph(
2823 GraphNode::new("Order")
2824 .value("id", 1_u64)
2825 .value("version", 1_i64)
2826 .value("name", "seed-order"),
2827 );
2828 let mut ctx = super::UserContext::new().with_module(module);
2829 ctx.insert_resource(SqliteDialect);
2830 ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2831
2832 ctx.ensure_sqlite_schema().await.unwrap();
2833 sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2834 .execute(&pool)
2835 .await
2836 .unwrap();
2837 ctx.ensure_sqlite_schema().await.unwrap();
2838
2839 let tables = sqlx::query(
2840 "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2841 )
2842 .fetch_all(&pool)
2843 .await
2844 .unwrap()
2845 .into_iter()
2846 .map(|row| row.try_get::<String, _>("name").unwrap())
2847 .collect::<Vec<_>>();
2848
2849 assert_eq!(
2850 tables,
2851 vec![
2852 "orderline".to_owned(),
2853 "orders".to_owned(),
2854 "product".to_owned()
2855 ]
2856 );
2857
2858 let seed_count: i64 =
2859 sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2860 .fetch_one(&pool)
2861 .await
2862 .unwrap();
2863 assert_eq!(seed_count, 1);
2864 }
2865
2866 #[tokio::test]
2867 async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2868 use sqlx::sqlite::SqlitePoolOptions;
2869
2870 let pool = SqlitePoolOptions::new()
2871 .max_connections(1)
2872 .connect("sqlite::memory:")
2873 .await
2874 .unwrap();
2875
2876 let executor = SqliteMutationExecutor::new(pool.clone());
2877 let dialect = SqliteDialect;
2878 let entity = typed_entity();
2879 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2880
2881 let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2882 let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2883 let payload = serde_json::json!({"name": "teaql", "count": 2});
2884 let amount = Decimal::new(12345, 2);
2885
2886 let insert = dialect
2887 .compile_insert(
2888 &entity,
2889 &InsertCommand::new("TypedValue")
2890 .value("id", 1_u64)
2891 .value("payload", payload.clone())
2892 .value("amount", amount)
2893 .value("birthday", birthday)
2894 .value("happened_at", happened_at),
2895 )
2896 .unwrap();
2897 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2898
2899 let select = dialect
2900 .compile_select(
2901 &entity,
2902 &SelectQuery::new("TypedValue")
2903 .project("id")
2904 .project("payload")
2905 .project("amount")
2906 .project("birthday")
2907 .project("happened_at")
2908 .filter(Expr::eq("id", 1_u64)),
2909 )
2910 .unwrap();
2911 let rows = executor.fetch_all(&select).await.unwrap();
2912 assert_eq!(rows.len(), 1);
2913 assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2914 assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2915 assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2916 assert_eq!(
2917 rows[0].get("happened_at"),
2918 Some(&Value::Timestamp(happened_at))
2919 );
2920 }
2921
2922 #[tokio::test(flavor = "multi_thread")]
2923 async fn sqlite_fetches_enhanced_typed_entities() {
2924 use sqlx::sqlite::SqlitePoolOptions;
2925
2926 let pool = SqlitePoolOptions::new()
2927 .max_connections(1)
2928 .connect("sqlite::memory:")
2929 .await
2930 .unwrap();
2931
2932 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2933 let order = entity();
2934 let line = line_entity();
2935 let product = product_entity();
2936 mutation_executor
2937 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2938 .await
2939 .unwrap();
2940
2941 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2942 .execute(&pool)
2943 .await
2944 .unwrap();
2945 sqlx::query(
2946 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2947 (101, 1, 1001, 'l1'),
2948 (102, 1, 1002, 'l2')",
2949 )
2950 .execute(&pool)
2951 .await
2952 .unwrap();
2953 sqlx::query(
2954 "INSERT INTO product (id, name) VALUES
2955 (1001, 'p1'),
2956 (1002, 'p2')",
2957 )
2958 .execute(&pool)
2959 .await
2960 .unwrap();
2961
2962 let executor = SqliteSyncExecutor::new(mutation_executor);
2963 let mut ctx = UserContext::new()
2964 .with_metadata(
2965 InMemoryMetadataStore::new()
2966 .with_entity(order)
2967 .with_entity(line)
2968 .with_entity(product),
2969 )
2970 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2971 .with_repository_behavior_registry(
2972 InMemoryRepositoryBehaviorRegistry::new()
2973 .with_behavior("Order", NestedOrderBehavior),
2974 );
2975 ctx.insert_resource(SqliteDialect);
2976 ctx.insert_resource(executor);
2977
2978 let repo = ctx
2979 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2980 .unwrap();
2981 let rows = repo
2982 .fetch_enhanced_entities::<OrderAggregateRow>(
2983 &repo
2984 .select()
2985 .project("id")
2986 .project("version")
2987 .project("name"),
2988 )
2989 .unwrap();
2990
2991 assert_eq!(rows.len(), 1);
2992 let order = rows.first().unwrap();
2993 assert_eq!(order.id, 1);
2994 assert_eq!(order.lines.len(), 2);
2995 assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2996 assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2997 }
2998
2999 #[tokio::test(flavor = "multi_thread")]
3000 async fn sqlite_fetches_typed_smart_list_entities() {
3001 use sqlx::sqlite::SqlitePoolOptions;
3002
3003 let pool = SqlitePoolOptions::new()
3004 .max_connections(1)
3005 .connect("sqlite::memory:")
3006 .await
3007 .unwrap();
3008
3009 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3010 let order = entity();
3011 mutation_executor
3012 .ensure_schema(&SqliteDialect, &[&order])
3013 .await
3014 .unwrap();
3015
3016 sqlx::query(
3017 "INSERT INTO orders (id, version, name) VALUES
3018 (1, 1, 'o1'),
3019 (2, 3, 'o2')",
3020 )
3021 .execute(&pool)
3022 .await
3023 .unwrap();
3024
3025 let executor = SqliteSyncExecutor::new(mutation_executor);
3026 let mut ctx = UserContext::new()
3027 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3028 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3029 ctx.insert_resource(SqliteDialect);
3030 ctx.insert_resource(executor);
3031
3032 let repo = ctx
3033 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3034 .unwrap();
3035 let rows = repo
3036 .fetch_entities::<OrderAggregateRow>(
3037 &repo
3038 .select()
3039 .project("id")
3040 .project("version")
3041 .project("name"),
3042 )
3043 .unwrap();
3044
3045 assert_eq!(rows.len(), 2);
3046 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3047 assert_eq!(rows.versions(), vec![1, 3]);
3048 assert!(rows.data[0].lines.is_empty());
3049 assert!(rows.data[1].lines.is_empty());
3050 }
3051
3052 #[tokio::test(flavor = "multi_thread")]
3053 async fn sqlite_fetches_smart_list_of_order_entities() {
3054 use sqlx::sqlite::SqlitePoolOptions;
3055
3056 let pool = SqlitePoolOptions::new()
3057 .max_connections(1)
3058 .connect("sqlite::memory:")
3059 .await
3060 .unwrap();
3061
3062 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3063 let order = entity();
3064 mutation_executor
3065 .ensure_schema(&SqliteDialect, &[&order])
3066 .await
3067 .unwrap();
3068
3069 sqlx::query(
3070 "INSERT INTO orders (id, version, name) VALUES
3071 (1, 1, 'o1'),
3072 (2, 3, 'o2')",
3073 )
3074 .execute(&pool)
3075 .await
3076 .unwrap();
3077
3078 let executor = SqliteSyncExecutor::new(mutation_executor);
3079 let mut ctx = UserContext::new()
3080 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3081 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3082 ctx.insert_resource(SqliteDialect);
3083 ctx.insert_resource(executor);
3084
3085 let repo = ctx
3086 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3087 .unwrap();
3088 let rows = repo
3089 .fetch_entities::<Order>(
3090 &repo
3091 .select()
3092 .project("id")
3093 .project("version")
3094 .project("name"),
3095 )
3096 .unwrap();
3097
3098 assert_eq!(
3099 rows.data,
3100 vec![
3101 Order {
3102 id: 1,
3103 version: 1,
3104 name: "o1".to_owned(),
3105 },
3106 Order {
3107 id: 2,
3108 version: 3,
3109 name: "o2".to_owned(),
3110 }
3111 ]
3112 );
3113 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3114 assert_eq!(rows.versions(), vec![1, 3]);
3115 }
3116
3117 #[tokio::test(flavor = "multi_thread")]
3118 async fn sqlite_insert_generates_missing_id() {
3119 use sqlx::sqlite::SqlitePoolOptions;
3120
3121 let pool = SqlitePoolOptions::new()
3122 .max_connections(1)
3123 .connect("sqlite::memory:")
3124 .await
3125 .unwrap();
3126
3127 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3128 let order = entity();
3129 mutation_executor
3130 .ensure_schema(&SqliteDialect, &[&order])
3131 .await
3132 .unwrap();
3133
3134 let executor = SqliteSyncExecutor::new(mutation_executor);
3135 let mut ctx = UserContext::new()
3136 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3137 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3138 ctx.insert_resource(SqliteDialect);
3139 ctx.insert_resource(executor);
3140
3141 let repo = ctx
3142 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3143 .unwrap();
3144 let affected = repo
3145 .insert(
3146 &repo
3147 .insert_command()
3148 .value("version", 1_i64)
3149 .value("name", "generated"),
3150 )
3151 .unwrap();
3152 assert_eq!(affected, 1);
3153
3154 let rows = repo
3155 .fetch_entities::<Order>(
3156 &repo
3157 .select()
3158 .project("id")
3159 .project("version")
3160 .project("name")
3161 .filter(Expr::eq("name", "generated")),
3162 )
3163 .unwrap();
3164 assert_eq!(rows.len(), 1);
3165 let order = rows.first().unwrap();
3166 assert!(order.id > 0);
3167 assert_eq!(order.version, 1);
3168 assert_eq!(order.name, "generated");
3169 }
3170
3171 #[tokio::test(flavor = "multi_thread")]
3172 async fn sqlite_save_graph_inserts_nested_rows() {
3173 use sqlx::{Row, sqlite::SqlitePoolOptions};
3174
3175 let pool = SqlitePoolOptions::new()
3176 .max_connections(1)
3177 .connect("sqlite::memory:?cache=shared")
3178 .await
3179 .unwrap();
3180
3181 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3182 let order = entity();
3183 let line = line_entity();
3184 let product = product_entity();
3185 mutation_executor
3186 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3187 .await
3188 .unwrap();
3189
3190 let executor = SqliteSyncExecutor::new(mutation_executor);
3191 let mut ctx = UserContext::new()
3192 .with_metadata(
3193 InMemoryMetadataStore::new()
3194 .with_entity(order)
3195 .with_entity(line)
3196 .with_entity(product),
3197 )
3198 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3199 ctx.insert_resource(SqliteDialect);
3200 ctx.insert_resource(executor);
3201
3202 let repo = ctx
3203 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3204 .unwrap();
3205 let graph = GraphNode::new("Order")
3206 .value("id", 1_u64)
3207 .value("version", 1_i64)
3208 .value("name", "root")
3209 .relation(
3210 "lines",
3211 GraphNode::new("OrderLine")
3212 .value("id", 10_u64)
3213 .value("name", "line-1")
3214 .relation(
3215 "product",
3216 GraphNode::new("Product")
3217 .value("id", 100_u64)
3218 .value("name", "sku-1"),
3219 ),
3220 );
3221
3222 let saved = repo.save_graph(graph).unwrap();
3223 assert_eq!(
3224 saved.relations["lines"][0].values.get("order_id"),
3225 Some(&Value::U64(1))
3226 );
3227 assert_eq!(
3228 saved.relations["lines"][0].values.get("product_id"),
3229 Some(&Value::U64(100))
3230 );
3231
3232 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3233 .fetch_one(&pool)
3234 .await
3235 .unwrap()
3236 .try_get("count")
3237 .unwrap();
3238 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3239 .fetch_one(&pool)
3240 .await
3241 .unwrap();
3242 let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3243 .fetch_one(&pool)
3244 .await
3245 .unwrap()
3246 .try_get("count")
3247 .unwrap();
3248
3249 assert_eq!(order_count, 1);
3250 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3251 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3252 assert_eq!(product_count, 1);
3253 }
3254
3255 #[tokio::test(flavor = "multi_thread")]
3256 async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3257 use sqlx::{Row, sqlite::SqlitePoolOptions};
3258
3259 let pool = SqlitePoolOptions::new()
3260 .max_connections(1)
3261 .connect("sqlite::memory:")
3262 .await
3263 .unwrap();
3264
3265 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3266 let order = entity();
3267 let line = line_entity();
3268 let product = product_entity();
3269 mutation_executor
3270 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3271 .await
3272 .unwrap();
3273
3274 let executor = SqliteSyncExecutor::new(mutation_executor);
3275 let mut ctx = UserContext::new()
3276 .with_metadata(
3277 InMemoryMetadataStore::new()
3278 .with_entity(order)
3279 .with_entity(line)
3280 .with_entity(product),
3281 )
3282 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3283 ctx.insert_resource(SqliteDialect);
3284 ctx.insert_resource(executor);
3285
3286 let repo = ctx
3287 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3288 .unwrap();
3289 let saved = repo
3290 .save_entity_graph(OrderAggregateRow {
3291 id: 2,
3292 version: 1,
3293 name: "typed-root".to_owned(),
3294 lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3295 id: 20,
3296 order_id: 0,
3297 name: "typed-line".to_owned(),
3298 product_id: 200,
3299 product: Some(ProductEntityRow {
3300 id: 200,
3301 name: "typed-sku".to_owned(),
3302 }),
3303 }]),
3304 })
3305 .unwrap();
3306
3307 assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3308 assert_eq!(
3309 saved.relations["lines"][0].values.get("order_id"),
3310 Some(&Value::U64(2))
3311 );
3312 assert_eq!(
3313 saved.relations["lines"][0].values.get("product_id"),
3314 Some(&Value::U64(200))
3315 );
3316
3317 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3318 .fetch_one(&pool)
3319 .await
3320 .unwrap()
3321 .try_get("count")
3322 .unwrap();
3323 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3324 .fetch_one(&pool)
3325 .await
3326 .unwrap();
3327 let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3328 .fetch_one(&pool)
3329 .await
3330 .unwrap();
3331
3332 assert_eq!(order_count, 1);
3333 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3334 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3335 assert_eq!(product_name, "typed-sku");
3336 }
3337
3338 #[tokio::test(flavor = "multi_thread")]
3339 async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3340 use sqlx::sqlite::SqlitePoolOptions;
3341 use std::time::{SystemTime, UNIX_EPOCH};
3342
3343 let db_path = std::env::temp_dir().join(format!(
3344 "teaql-plan-{}-{}.db",
3345 std::process::id(),
3346 SystemTime::now()
3347 .duration_since(UNIX_EPOCH)
3348 .unwrap()
3349 .as_nanos()
3350 ));
3351 let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3352 let pool = SqlitePoolOptions::new()
3353 .max_connections(1)
3354 .connect(&db_url)
3355 .await
3356 .unwrap();
3357
3358 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3359 let order = entity();
3360 let line = line_entity();
3361 let product = product_entity();
3362 mutation_executor
3363 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3364 .await
3365 .unwrap();
3366
3367 sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3368 .execute(&pool)
3369 .await
3370 .unwrap();
3371 sqlx::query(
3372 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3373 (200, 1, 100, 301, 'line-a'),
3374 (201, 1, 100, 302, 'line-b')",
3375 )
3376 .execute(&pool)
3377 .await
3378 .unwrap();
3379 let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3380 .fetch_one(&pool)
3381 .await
3382 .unwrap();
3383 assert_eq!(seeded_lines, 2);
3384
3385 let executor = SqliteSyncExecutor::new(mutation_executor);
3386 let mut ctx = UserContext::new()
3387 .with_metadata(
3388 InMemoryMetadataStore::new()
3389 .with_entity(order)
3390 .with_entity(line)
3391 .with_entity(product),
3392 )
3393 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3394 .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3395 ctx.insert_resource(SqliteDialect);
3396 ctx.insert_resource(executor);
3397
3398 let graph = GraphNode::new("Order")
3399 .value("id", 100_u64)
3400 .value("version", 1_i64)
3401 .value("name", "existing-updated")
3402 .relation(
3403 "lines",
3404 GraphNode::new("OrderLine")
3405 .value("name", "new-line-a")
3406 .value("product_id", 401_u64),
3407 )
3408 .relation(
3409 "lines",
3410 GraphNode::new("OrderLine")
3411 .value("name", "new-line-b")
3412 .value("product_id", 402_u64),
3413 )
3414 .relation(
3415 "lines",
3416 GraphNode::new("OrderLine")
3417 .value("id", 200_u64)
3418 .value("version", 1_i64)
3419 .value("name", "line-a-updated"),
3420 )
3421 .relation(
3422 "lines",
3423 GraphNode::new("OrderLine")
3424 .value("id", 201_u64)
3425 .value("version", 1_i64)
3426 .value("name", "line-b-updated"),
3427 );
3428
3429 let plan = ctx
3430 .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3431 .unwrap();
3432 let counts = plan.grouped_counts();
3433 assert_eq!(
3434 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3435 Some(&1)
3436 );
3437 assert_eq!(
3438 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3439 Some(&2)
3440 );
3441 assert_eq!(
3442 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3443 Some(&2)
3444 );
3445
3446 let create_batch = plan
3447 .batches
3448 .iter()
3449 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3450 .unwrap();
3451 assert_eq!(create_batch.items.len(), 2);
3452 assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3453 assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3454
3455 let update_batch = plan
3456 .batches
3457 .iter()
3458 .find(|batch| {
3459 batch.entity == "OrderLine"
3460 && batch.kind == GraphMutationKind::Update
3461 && batch.update_fields == vec!["name".to_owned()]
3462 })
3463 .unwrap();
3464 assert_eq!(update_batch.items.len(), 2);
3465
3466 let repo = ctx
3467 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3468 .unwrap();
3469 let saved = repo.execute_graph_plan(plan).unwrap();
3470 let lines = saved.relations.get("lines").unwrap();
3471 assert_eq!(lines.len(), 4);
3472
3473 let generated_count: i64 = sqlx::query_scalar(
3474 "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3475 )
3476 .fetch_one(&pool)
3477 .await
3478 .unwrap();
3479 assert_eq!(generated_count, 2);
3480 let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3481 .fetch_one(&pool)
3482 .await
3483 .unwrap();
3484 assert_eq!(updated_name, "line-a-updated");
3485 pool.close().await;
3486 let _ = std::fs::remove_file(db_path);
3487 }
3488
3489 #[tokio::test(flavor = "multi_thread")]
3490 async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3491 use sqlx::{Row, sqlite::SqlitePoolOptions};
3492
3493 let pool = SqlitePoolOptions::new()
3494 .max_connections(1)
3495 .connect("sqlite::memory:")
3496 .await
3497 .unwrap();
3498
3499 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3500 let order = entity();
3501 let line = line_entity();
3502 let product = product_entity();
3503 mutation_executor
3504 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3505 .await
3506 .unwrap();
3507
3508 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3509 .execute(&pool)
3510 .await
3511 .unwrap();
3512 sqlx::query(
3513 "INSERT INTO product (id, name) VALUES
3514 (100, 'old-sku'),
3515 (101, 'removed-sku')",
3516 )
3517 .execute(&pool)
3518 .await
3519 .unwrap();
3520 sqlx::query(
3521 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3522 (10, 1, 100, 'old-line'),
3523 (11, 1, 101, 'removed-line')",
3524 )
3525 .execute(&pool)
3526 .await
3527 .unwrap();
3528 sqlx::query("UPDATE orderline SET version = 1")
3529 .execute(&pool)
3530 .await
3531 .unwrap();
3532
3533 let executor = SqliteSyncExecutor::new(mutation_executor);
3534 let mut ctx = UserContext::new()
3535 .with_metadata(
3536 InMemoryMetadataStore::new()
3537 .with_entity(order)
3538 .with_entity(line)
3539 .with_entity(product),
3540 )
3541 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3542 ctx.insert_resource(SqliteDialect);
3543 ctx.insert_resource(executor);
3544
3545 let repo = ctx
3546 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3547 .unwrap();
3548 let graph = GraphNode::new("Order")
3549 .value("id", 1_u64)
3550 .value("version", 1_i64)
3551 .value("name", "new-root")
3552 .relation(
3553 "lines",
3554 GraphNode::new("OrderLine")
3555 .value("id", 10_u64)
3556 .value("version", 1_i64)
3557 .value("name", "new-line")
3558 .relation(
3559 "product",
3560 GraphNode::new("Product")
3561 .value("id", 100_u64)
3562 .value("name", "new-sku"),
3563 ),
3564 )
3565 .relation(
3566 "lines",
3567 GraphNode::new("OrderLine")
3568 .value("id", 12_u64)
3569 .value("version", 1_i64)
3570 .value("name", "added-line")
3571 .relation(
3572 "product",
3573 GraphNode::new("Product")
3574 .value("id", 102_u64)
3575 .value("name", "added-sku"),
3576 ),
3577 );
3578
3579 let saved = repo.save_graph(graph).unwrap();
3580 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3581
3582 let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3583 .fetch_one(&pool)
3584 .await
3585 .unwrap();
3586 assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3587 assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3588
3589 let updated_line =
3590 sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3591 .fetch_one(&pool)
3592 .await
3593 .unwrap();
3594 assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3595 assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3596 assert_eq!(
3597 updated_line.try_get::<String, _>("name").unwrap(),
3598 "new-line"
3599 );
3600
3601 let added_line =
3602 sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3603 .fetch_one(&pool)
3604 .await
3605 .unwrap();
3606 assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3607 assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3608 assert_eq!(
3609 added_line.try_get::<String, _>("name").unwrap(),
3610 "added-line"
3611 );
3612
3613 let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3614 .fetch_one(&pool)
3615 .await
3616 .unwrap();
3617 assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3618
3619 let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3620 .fetch_one(&pool)
3621 .await
3622 .unwrap();
3623 assert_eq!(
3624 updated_product.try_get::<String, _>("name").unwrap(),
3625 "new-sku"
3626 );
3627 }
3628
3629 #[tokio::test(flavor = "multi_thread")]
3630 async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3631 use sqlx::{Row, sqlite::SqlitePoolOptions};
3632
3633 let pool = SqlitePoolOptions::new()
3634 .max_connections(1)
3635 .connect("sqlite::memory:")
3636 .await
3637 .unwrap();
3638
3639 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3640 let order = sqlite_entity_keep_missing();
3641 let line = line_entity();
3642 let product = product_entity();
3643 mutation_executor
3644 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3645 .await
3646 .unwrap();
3647
3648 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3649 .execute(&pool)
3650 .await
3651 .unwrap();
3652 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3653 .execute(&pool)
3654 .await
3655 .unwrap();
3656 sqlx::query(
3657 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3658 (10, 1, 1, 100, 'remove-me'),
3659 (11, 1, 1, 100, 'keep-me')",
3660 )
3661 .execute(&pool)
3662 .await
3663 .unwrap();
3664
3665 let executor = SqliteSyncExecutor::new(mutation_executor);
3666 let mut ctx = UserContext::new()
3667 .with_metadata(
3668 InMemoryMetadataStore::new()
3669 .with_entity(order)
3670 .with_entity(line)
3671 .with_entity(product),
3672 )
3673 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3674 ctx.insert_resource(SqliteDialect);
3675 ctx.insert_resource(executor);
3676
3677 let repo = ctx
3678 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3679 .unwrap();
3680 let graph = GraphNode::new("Order")
3681 .value("id", 1_u64)
3682 .value("version", 1_i64)
3683 .value("name", "root-updated")
3684 .relation(
3685 "lines",
3686 GraphNode::new("OrderLine")
3687 .value("id", 10_u64)
3688 .value("version", 1_i64)
3689 .remove(),
3690 )
3691 .relation(
3692 "lines",
3693 GraphNode::new("OrderLine")
3694 .value("id", 12_u64)
3695 .value("version", 1_i64)
3696 .value("name", "new-reference-line")
3697 .relation(
3698 "product",
3699 GraphNode::new("Product").value("id", 100_u64).reference(),
3700 ),
3701 );
3702
3703 let saved = repo.save_graph(graph).unwrap();
3704 assert_eq!(
3705 saved.relations["lines"][1].relations["product"][0]
3706 .values
3707 .get("name"),
3708 Some(&Value::Text("reference-only".to_owned()))
3709 );
3710
3711 let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3712 .fetch_one(&pool)
3713 .await
3714 .unwrap();
3715 assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3716
3717 let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3718 .fetch_one(&pool)
3719 .await
3720 .unwrap();
3721 assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3722 assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3723
3724 let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3725 .fetch_one(&pool)
3726 .await
3727 .unwrap();
3728 assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3729
3730 let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3731 .fetch_one(&pool)
3732 .await
3733 .unwrap();
3734 assert_eq!(
3735 product.try_get::<String, _>("name").unwrap(),
3736 "reference-only"
3737 );
3738 }
3739
3740 #[tokio::test(flavor = "multi_thread")]
3741 async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3742 use sqlx::sqlite::SqlitePoolOptions;
3743
3744 let pool = SqlitePoolOptions::new()
3745 .max_connections(1)
3746 .connect("sqlite::memory:")
3747 .await
3748 .unwrap();
3749
3750 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3751 let order = entity();
3752 let line = line_entity();
3753 let product = product_entity();
3754 mutation_executor
3755 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3756 .await
3757 .unwrap();
3758
3759 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3760 .execute(&pool)
3761 .await
3762 .unwrap();
3763 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3764 .execute(&pool)
3765 .await
3766 .unwrap();
3767 sqlx::query(
3768 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3769 (10, 1, 1, 100, 'line-10')",
3770 )
3771 .execute(&pool)
3772 .await
3773 .unwrap();
3774
3775 let executor = SqliteSyncExecutor::new(mutation_executor);
3776 let mut ctx = UserContext::new()
3777 .with_metadata(
3778 InMemoryMetadataStore::new()
3779 .with_entity(order)
3780 .with_entity(line)
3781 .with_entity(product),
3782 )
3783 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3784 ctx.insert_resource(SqliteDialect);
3785 ctx.insert_resource(executor);
3786
3787 let repo = ctx
3788 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3789 .unwrap();
3790
3791 let missing_reference = repo.save_graph(
3792 GraphNode::new("Order").value("id", 1_u64).relation(
3793 "lines",
3794 GraphNode::new("OrderLine")
3795 .value("id", 12_u64)
3796 .value("version", 1_i64)
3797 .value("name", "bad-reference")
3798 .relation(
3799 "product",
3800 GraphNode::new("Product").value("id", 999_u64).reference(),
3801 ),
3802 ),
3803 );
3804 assert!(format!("{missing_reference:?}").contains("does not exist"));
3805
3806 let mutable_reference = repo.save_graph(
3807 GraphNode::new("Order").value("id", 1_u64).relation(
3808 "lines",
3809 GraphNode::new("OrderLine")
3810 .value("id", 12_u64)
3811 .value("version", 1_i64)
3812 .value("name", "mutable-reference")
3813 .relation(
3814 "product",
3815 GraphNode::new("Product")
3816 .value("id", 100_u64)
3817 .value("name", "should-not-mutate")
3818 .reference(),
3819 ),
3820 ),
3821 );
3822 assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3823
3824 let duplicate_child = repo.save_graph(
3825 GraphNode::new("Order")
3826 .value("id", 1_u64)
3827 .relation(
3828 "lines",
3829 GraphNode::new("OrderLine")
3830 .value("id", 10_u64)
3831 .value("version", 1_i64)
3832 .reference(),
3833 )
3834 .relation(
3835 "lines",
3836 GraphNode::new("OrderLine")
3837 .value("id", 10_u64)
3838 .value("version", 1_i64)
3839 .reference(),
3840 ),
3841 );
3842 assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3843
3844 let reference_version_conflict = repo.save_graph(
3845 GraphNode::new("Order").value("id", 1_u64).relation(
3846 "lines",
3847 GraphNode::new("OrderLine")
3848 .value("id", 10_u64)
3849 .value("version", 2_i64)
3850 .reference(),
3851 ),
3852 );
3853 assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3854
3855 let remove_with_child = repo.save_graph(
3856 GraphNode::new("Order").value("id", 1_u64).relation(
3857 "lines",
3858 GraphNode::new("OrderLine")
3859 .value("id", 10_u64)
3860 .value("version", 1_i64)
3861 .relation(
3862 "product",
3863 GraphNode::new("Product").value("id", 100_u64).reference(),
3864 )
3865 .remove(),
3866 ),
3867 );
3868 assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3869
3870 let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3871 assert!(format!("{remove:?}").contains("does not exist"));
3872 }
3873
3874 #[tokio::test(flavor = "multi_thread")]
3875 async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3876 use sqlx::{Row, sqlite::SqlitePoolOptions};
3877
3878 let pool = SqlitePoolOptions::new()
3879 .max_connections(1)
3880 .connect("sqlite::memory:")
3881 .await
3882 .unwrap();
3883
3884 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3885 let order = entity();
3886 let line = line_entity();
3887 let product = product_entity();
3888 mutation_executor
3889 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3890 .await
3891 .unwrap();
3892
3893 let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3894 let mut ctx = UserContext::new()
3895 .with_metadata(
3896 InMemoryMetadataStore::new()
3897 .with_entity(order)
3898 .with_entity(line)
3899 .with_entity(product),
3900 )
3901 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3902 ctx.insert_resource(SqliteDialect);
3903 ctx.insert_resource(executor);
3904
3905 let repo = ctx
3906 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3907 .unwrap();
3908 let error = repo.save_graph(
3909 GraphNode::new("Order")
3910 .value("id", 1_u64)
3911 .value("version", 1_i64)
3912 .value("name", "rollback-root")
3913 .relation(
3914 "lines",
3915 GraphNode::new("OrderLine")
3916 .value("id", 10_u64)
3917 .value("version", 1_i64)
3918 .value("name", "rollback-line")
3919 .value("product_id", 999_u64)
3920 .relation(
3921 "product",
3922 GraphNode::new("Product").value("id", 999_u64).reference(),
3923 ),
3924 ),
3925 );
3926 assert!(format!("{error:?}").contains("does not exist"));
3927
3928 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3929 .fetch_one(&pool)
3930 .await
3931 .unwrap()
3932 .try_get("count")
3933 .unwrap();
3934 assert_eq!(count, 0);
3935 }
3936
3937 #[tokio::test]
3938 async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3939 use sqlx::{PgPool, Row};
3940
3941 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3942 return;
3943 };
3944
3945 let pool = PgPool::connect(&database_url).await.unwrap();
3946 sqlx::query("DROP TABLE IF EXISTS orderline")
3947 .execute(&pool)
3948 .await
3949 .unwrap();
3950 sqlx::query("DROP TABLE IF EXISTS orders")
3951 .execute(&pool)
3952 .await
3953 .unwrap();
3954
3955 let executor = PgMutationExecutor::new(pool.clone());
3956 let dialect = PostgresDialect;
3957 let order = entity();
3958 let line = line_entity();
3959
3960 executor
3961 .ensure_schema(&dialect, &[&order, &line])
3962 .await
3963 .unwrap();
3964
3965 let tables = sqlx::query(
3966 "SELECT table_name
3967 FROM information_schema.tables
3968 WHERE table_schema = current_schema()
3969 AND table_name IN ('orders', 'orderline')
3970 ORDER BY table_name",
3971 )
3972 .fetch_all(&pool)
3973 .await
3974 .unwrap()
3975 .into_iter()
3976 .map(|row| row.try_get::<String, _>("table_name").unwrap())
3977 .collect::<Vec<_>>();
3978 assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3979
3980 let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3981 .fetch_one(&pool)
3982 .await
3983 .unwrap();
3984 assert_eq!(soundex, "R163");
3985
3986 sqlx::query(
3987 "INSERT INTO orders (id, version, name) VALUES
3988 (1, 1, 'draft'),
3989 (2, 1, 'submitted'),
3990 (3, 1, 'archived')",
3991 )
3992 .execute(&pool)
3993 .await
3994 .unwrap();
3995
3996 let array_bound_query = dialect
3997 .compile_select(
3998 &order,
3999 &SelectQuery::new("Order")
4000 .filter(
4001 Expr::in_large(
4002 "id",
4003 vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4004 )
4005 .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4006 )
4007 .order_asc("id"),
4008 )
4009 .unwrap();
4010 assert_eq!(
4011 array_bound_query.sql,
4012 format!(
4013 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4014 )
4015 );
4016 let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4017 assert_eq!(rows.len(), 2);
4018 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4019 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4020
4021 sqlx::query(
4022 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4023 (11, 1, 1, 101, 'line-1'),
4024 (12, 1, 2, 102, 'line-2'),
4025 (13, 1, 3, 103, 'archived-line')",
4026 )
4027 .execute(&pool)
4028 .await
4029 .unwrap();
4030
4031 let subquery = dialect
4032 .compile_select(
4033 &order,
4034 &SelectQuery::new("Order")
4035 .filter(Expr::in_subquery(
4036 "id",
4037 line.clone(),
4038 SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4039 "order_id",
4040 ))
4041 .order_asc("id"),
4042 )
4043 .unwrap();
4044 assert_eq!(
4045 subquery.sql,
4046 format!(
4047 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4048 )
4049 );
4050 let rows = executor.fetch_all(&subquery).await.unwrap();
4051 assert_eq!(rows.len(), 2);
4052 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4053 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4054
4055 let projected = dialect
4056 .compile_select(
4057 &order,
4058 &SelectQuery::new("Order")
4059 .project("id")
4060 .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4061 .order_gbk_asc("name")
4062 .limit(1),
4063 )
4064 .unwrap();
4065 assert_eq!(
4066 projected.sql,
4067 "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4068 );
4069 let rows = executor.fetch_all(&projected).await.unwrap();
4070 assert_eq!(rows.len(), 1);
4071 assert!(rows[0].contains_key("nameSound"));
4072
4073 let aggregate = dialect
4074 .compile_select(
4075 &order,
4076 &SelectQuery::new("Order")
4077 .count("total")
4078 .stddev("version", "stddevVersion")
4079 .var_pop("version", "varPopVersion")
4080 .bit_or("version", "bitOrVersion")
4081 .having(Expr::binary(
4082 Expr::count_all(),
4083 teaql_core::BinaryOp::Gt,
4084 Expr::value(2_i64),
4085 )),
4086 )
4087 .unwrap();
4088 assert_eq!(
4089 aggregate.sql,
4090 "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4091 );
4092 let rows = executor.fetch_all(&aggregate).await.unwrap();
4093 assert_eq!(rows.len(), 1);
4094 assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4095 assert!(matches!(
4096 rows[0].get("stddevVersion"),
4097 Some(Value::Decimal(_))
4098 ));
4099 assert!(matches!(
4100 rows[0].get("varPopVersion"),
4101 Some(Value::Decimal(_))
4102 ));
4103 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4104
4105 sqlx::query("DROP TABLE orderline")
4106 .execute(&pool)
4107 .await
4108 .unwrap();
4109 sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4110 .execute(&pool)
4111 .await
4112 .unwrap();
4113
4114 executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4115
4116 let columns = sqlx::query(
4117 "SELECT column_name
4118 FROM information_schema.columns
4119 WHERE table_schema = current_schema()
4120 AND table_name = 'orderline'
4121 ORDER BY column_name",
4122 )
4123 .fetch_all(&pool)
4124 .await
4125 .unwrap()
4126 .into_iter()
4127 .map(|row| row.try_get::<String, _>("column_name").unwrap())
4128 .collect::<Vec<_>>();
4129 assert!(columns.contains(&"id".to_owned()));
4130 assert!(columns.contains(&"order_id".to_owned()));
4131 assert!(columns.contains(&"product_id".to_owned()));
4132 assert!(columns.contains(&"name".to_owned()));
4133
4134 sqlx::query("DROP TABLE IF EXISTS orderline")
4135 .execute(&pool)
4136 .await
4137 .unwrap();
4138 sqlx::query("DROP TABLE IF EXISTS orders")
4139 .execute(&pool)
4140 .await
4141 .unwrap();
4142 }
4143
4144 #[tokio::test(flavor = "multi_thread")]
4145 async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4146 use sqlx::{PgPool, Row};
4147
4148 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4149 return;
4150 };
4151
4152 let pool = PgPool::connect(&database_url).await.unwrap();
4153 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4154 .execute(&pool)
4155 .await
4156 .unwrap();
4157 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4158 .execute(&pool)
4159 .await
4160 .unwrap();
4161
4162 let order = entity().table_name("orders_idgen");
4163 let executor = PgMutationExecutor::new(pool.clone());
4164 executor
4165 .ensure_schema(&PostgresDialect, &[&order])
4166 .await
4167 .unwrap();
4168
4169 let mut ctx = UserContext::new()
4170 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4171 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4172 .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4173 ctx.insert_resource(PostgresDialect);
4174 ctx.insert_resource(PgSyncExecutor::new(executor));
4175
4176 let repo = ctx
4177 .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4178 .unwrap();
4179 repo.insert(
4180 &repo
4181 .insert_command()
4182 .value("version", 1_i64)
4183 .value("name", "generated-1"),
4184 )
4185 .unwrap();
4186 repo.insert(
4187 &repo
4188 .insert_command()
4189 .value("version", 1_i64)
4190 .value("name", "generated-2"),
4191 )
4192 .unwrap();
4193
4194 let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4195 .fetch_all(&pool)
4196 .await
4197 .unwrap()
4198 .into_iter()
4199 .map(|row| row.try_get::<i64, _>("id").unwrap())
4200 .collect::<Vec<_>>();
4201 assert_eq!(ids, vec![1, 2]);
4202
4203 let current: i64 = sqlx::query_scalar(
4204 "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4205 )
4206 .fetch_one(&pool)
4207 .await
4208 .unwrap();
4209 assert_eq!(current, 2);
4210
4211 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4212 .execute(&pool)
4213 .await
4214 .unwrap();
4215 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4216 .execute(&pool)
4217 .await
4218 .unwrap();
4219 }
4220
4221 #[tokio::test]
4222 async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4223 use sqlx::PgPool;
4224
4225 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4226 return;
4227 };
4228
4229 let pool = PgPool::connect(&database_url).await.unwrap();
4230 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4231 .execute(&pool)
4232 .await
4233 .unwrap();
4234
4235 let order = entity().table_name("orders_agg");
4236 let executor = PgMutationExecutor::new(pool.clone());
4237 let dialect = PostgresDialect;
4238 executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4239
4240 sqlx::query(
4241 "INSERT INTO orders_agg (id, version, name) VALUES
4242 (1, 1, 'A'),
4243 (2, 2, 'A'),
4244 (3, 3, 'B'),
4245 (4, 4, 'B')",
4246 )
4247 .execute(&pool)
4248 .await
4249 .unwrap();
4250
4251 let aggregate = dialect
4252 .compile_select(
4253 &order,
4254 &SelectQuery::new("Order")
4255 .count("total")
4256 .count_field("version", "versionCount")
4257 .sum("version", "sumVersion")
4258 .avg("version", "avgVersion")
4259 .min("version", "minVersion")
4260 .max("version", "maxVersion")
4261 .stddev("version", "stddevVersion")
4262 .stddev_pop("version", "stddevPopVersion")
4263 .var_samp("version", "varSampVersion")
4264 .var_pop("version", "varPopVersion")
4265 .bit_and("version", "bitAndVersion")
4266 .bit_or("version", "bitOrVersion")
4267 .bit_xor("version", "bitXorVersion")
4268 .having(Expr::binary(
4269 Expr::count_all(),
4270 teaql_core::BinaryOp::Gt,
4271 Expr::value(3_i64),
4272 )),
4273 )
4274 .unwrap();
4275 let rows = executor.fetch_all(&aggregate).await.unwrap();
4276 assert_eq!(rows.len(), 1);
4277 let row = &rows[0];
4278 assert_eq!(row.get("total"), Some(&Value::I64(4)));
4279 assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4280 assert_eq!(
4281 row.get("sumVersion"),
4282 Some(&Value::Decimal(Decimal::new(10, 0)))
4283 );
4284 assert_eq!(
4285 row.get("avgVersion"),
4286 Some(&Value::Decimal(Decimal::new(25, 1)))
4287 );
4288 assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4289 assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4290 assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4291 assert!(matches!(
4292 row.get("stddevPopVersion"),
4293 Some(Value::Decimal(_))
4294 ));
4295 assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4296 assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4297 assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4298 assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4299 assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4300
4301 let grouped = dialect
4302 .compile_select(
4303 &order,
4304 &SelectQuery::new("Order")
4305 .group_by("name")
4306 .count("total")
4307 .sum("version", "sumVersion")
4308 .having(Expr::binary(
4309 Expr::count_all(),
4310 teaql_core::BinaryOp::Gt,
4311 Expr::value(1_i64),
4312 ))
4313 .order_asc("name"),
4314 )
4315 .unwrap();
4316 let rows = executor.fetch_all(&grouped).await.unwrap();
4317 assert_eq!(rows.len(), 2);
4318 assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4319 assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4320 assert_eq!(
4321 rows[0].get("sumVersion"),
4322 Some(&Value::Decimal(Decimal::new(3, 0)))
4323 );
4324 assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4325 assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4326 assert_eq!(
4327 rows[1].get("sumVersion"),
4328 Some(&Value::Decimal(Decimal::new(7, 0)))
4329 );
4330
4331 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4332 .execute(&pool)
4333 .await
4334 .unwrap();
4335 }
4336
4337 #[tokio::test]
4338 async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4339 use sqlx::{PgPool, Row};
4340
4341 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4342 return;
4343 };
4344
4345 let pool = PgPool::connect(&database_url).await.unwrap();
4346 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4347 .execute(&pool)
4348 .await
4349 .unwrap();
4350 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4351 .execute(&pool)
4352 .await
4353 .unwrap();
4354 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4355 .execute(&pool)
4356 .await
4357 .unwrap();
4358
4359 let order = entity().table_name("orders_ctx");
4360 let line = line_entity().table_name("orderline_ctx");
4361 let product = product_entity().table_name("product_ctx");
4362 let module = super::RuntimeModule::new()
4363 .descriptor(order)
4364 .descriptor(line)
4365 .descriptor(product);
4366 let mut ctx = super::UserContext::new().with_module(module);
4367 ctx.insert_resource(PostgresDialect);
4368 ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4369
4370 ctx.ensure_postgres_schema().await.unwrap();
4371
4372 let tables = sqlx::query(
4373 "SELECT table_name
4374 FROM information_schema.tables
4375 WHERE table_schema = current_schema()
4376 AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4377 ORDER BY table_name",
4378 )
4379 .fetch_all(&pool)
4380 .await
4381 .unwrap()
4382 .into_iter()
4383 .map(|row| row.try_get::<String, _>("table_name").unwrap())
4384 .collect::<Vec<_>>();
4385
4386 assert_eq!(
4387 tables,
4388 vec![
4389 "orderline_ctx".to_owned(),
4390 "orders_ctx".to_owned(),
4391 "product_ctx".to_owned()
4392 ]
4393 );
4394
4395 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4396 .execute(&pool)
4397 .await
4398 .unwrap();
4399 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4400 .execute(&pool)
4401 .await
4402 .unwrap();
4403 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4404 .execute(&pool)
4405 .await
4406 .unwrap();
4407 }
4408
4409 #[tokio::test(flavor = "multi_thread")]
4410 async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4411 use sqlx::{PgPool, Row};
4412
4413 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4414 return;
4415 };
4416
4417 let pool = PgPool::connect(&database_url).await.unwrap();
4418 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4419 .execute(&pool)
4420 .await
4421 .unwrap();
4422 sqlx::query("DROP TABLE IF EXISTS product_tx")
4423 .execute(&pool)
4424 .await
4425 .unwrap();
4426 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4427 .execute(&pool)
4428 .await
4429 .unwrap();
4430
4431 let order = entity().table_name("orders_tx");
4432 let line = line_entity().table_name("orderline_tx");
4433 let product = product_entity().table_name("product_tx");
4434 let schema_executor = PgMutationExecutor::new(pool.clone());
4435 schema_executor
4436 .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4437 .await
4438 .unwrap();
4439
4440 let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4441 let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4442 let mut ctx = UserContext::new()
4443 .with_metadata(
4444 InMemoryMetadataStore::new()
4445 .with_entity(order)
4446 .with_entity(line)
4447 .with_entity(product),
4448 )
4449 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4450 ctx.insert_resource(PostgresDialect);
4451 ctx.insert_resource(sync_executor);
4452
4453 let repo = ctx
4454 .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4455 .unwrap();
4456 repo.save_graph(
4457 GraphNode::new("Order")
4458 .value("id", 1_u64)
4459 .value("version", 1_i64)
4460 .value("name", "pg-rollback-root")
4461 .relation(
4462 "lines",
4463 GraphNode::new("OrderLine")
4464 .value("id", 10_u64)
4465 .value("version", 1_i64)
4466 .value("name", "pg-rollback-line")
4467 .relation(
4468 "product",
4469 GraphNode::new("Product")
4470 .value("id", 100_u64)
4471 .value("name", "pg-rollback-sku"),
4472 ),
4473 ),
4474 )
4475 .unwrap();
4476
4477 tx_executor.rollback().await.unwrap();
4478
4479 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4480 .fetch_one(&pool)
4481 .await
4482 .unwrap()
4483 .try_get("count")
4484 .unwrap();
4485 assert_eq!(count, 0);
4486
4487 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4488 .execute(&pool)
4489 .await
4490 .unwrap();
4491 sqlx::query("DROP TABLE IF EXISTS product_tx")
4492 .execute(&pool)
4493 .await
4494 .unwrap();
4495 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4496 .execute(&pool)
4497 .await
4498 .unwrap();
4499 }
4500}
4501pub use checker::{
4502 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4503 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4504 mark_record_status,
4505};