1mod checker;
2mod context;
3mod entity_runtime;
4mod error;
5mod event;
6mod graph;
7mod id;
8mod language;
9mod memory;
10mod registry;
11mod repository;
12
13pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
14pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
15pub use error::{ContextError, RepositoryError, RuntimeError};
16pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
17pub use graph::{
18 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
19 GraphOperation, sorted_update_fields,
20};
21pub(crate) use id::local_id_generator;
22pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
23pub use language::{
24 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
25};
26pub use memory::{MemoryRepository, MemoryRepositoryError};
27pub use registry::{
28 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
29 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
30 RuntimeModule,
31};
32pub use repository::{
33 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
34 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
35};
36
37#[cfg(feature = "sqlx")]
38pub mod sqlx_support;
39
40#[cfg(test)]
41mod tests {
42 use std::collections::{BTreeMap, VecDeque};
43 use std::sync::{Arc, Mutex};
44
45 use super::{
46 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
47 Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
48 GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
49 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
50 InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
51 QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
52 RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
53 };
54 use teaql_core::{
55 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
56 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
57 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
58 };
59 use teaql_dialect_pg::PostgresDialect;
60 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
61 use teaql_sql::CompiledQuery;
62
63 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
64
65 fn entity() -> EntityDescriptor {
66 EntityDescriptor::new("Order")
67 .table_name("orders")
68 .property(
69 PropertyDescriptor::new("id", DataType::U64)
70 .column_name("id")
71 .id()
72 .not_null(),
73 )
74 .property(
75 PropertyDescriptor::new("version", DataType::I64)
76 .column_name("version")
77 .version()
78 .not_null(),
79 )
80 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
81 .relation(
82 teaql_core::RelationDescriptor::new("lines", "OrderLine")
83 .local_key("id")
84 .foreign_key("order_id")
85 .many(),
86 )
87 }
88
89 fn line_entity() -> EntityDescriptor {
90 EntityDescriptor::new("OrderLine")
91 .table_name("orderline")
92 .property(
93 PropertyDescriptor::new("id", DataType::U64)
94 .column_name("id")
95 .id()
96 .not_null(),
97 )
98 .property(
99 PropertyDescriptor::new("version", DataType::I64)
100 .column_name("version")
101 .version(),
102 )
103 .property(
104 PropertyDescriptor::new("order_id", DataType::U64)
105 .column_name("order_id")
106 .not_null(),
107 )
108 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
109 .property(
110 PropertyDescriptor::new("product_id", DataType::U64)
111 .column_name("product_id")
112 .not_null(),
113 )
114 .relation(
115 teaql_core::RelationDescriptor::new("product", "Product")
116 .local_key("product_id")
117 .foreign_key("id"),
118 )
119 }
120
121 fn product_entity() -> EntityDescriptor {
122 EntityDescriptor::new("Product")
123 .table_name("product")
124 .property(
125 PropertyDescriptor::new("id", DataType::U64)
126 .column_name("id")
127 .id()
128 .not_null(),
129 )
130 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
131 }
132
133 #[derive(Debug, Default)]
134 struct StubExecutor {
135 affected: u64,
136 rows: Vec<Record>,
137 }
138
139 #[derive(Debug, Default)]
140 struct QueueExecutor {
141 affected: u64,
142 rows: Mutex<VecDeque<Vec<Record>>>,
143 queries: Mutex<Vec<String>>,
144 }
145
146 struct OrderBehavior;
147
148 #[allow(dead_code)]
149 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
150 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
151 struct CatalogProductRow {
152 #[teaql(id)]
153 id: u64,
154 name: String,
155 }
156
157 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
158 #[teaql(entity = "OrderAggregate", table = "orders")]
159 struct OrderAggregateDynamic {
160 #[teaql(id)]
161 id: u64,
162 #[teaql(dynamic)]
163 dynamic: BTreeMap<String, Value>,
164 }
165
166 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
167 #[teaql(entity = "Product", table = "product")]
168 struct ProductEntityRow {
169 #[teaql(id)]
170 id: u64,
171 name: String,
172 }
173
174 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
175 #[teaql(entity = "OrderLine", table = "orderline")]
176 struct OrderLineEntityRow {
177 #[teaql(id)]
178 id: u64,
179 #[teaql(column = "order_id")]
180 order_id: u64,
181 name: String,
182 #[teaql(column = "product_id")]
183 product_id: u64,
184 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
185 product: Option<ProductEntityRow>,
186 }
187
188 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
189 #[teaql(entity = "Order", table = "orders")]
190 struct OrderAggregateRow {
191 #[teaql(id)]
192 id: u64,
193 #[teaql(version)]
194 version: i64,
195 name: String,
196 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
197 lines: teaql_core::SmartList<OrderLineEntityRow>,
198 }
199
200 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201 #[teaql(entity = "Order", table = "orders")]
202 struct Order {
203 #[teaql(id)]
204 id: u64,
205 #[teaql(version)]
206 version: i64,
207 name: String,
208 }
209
210 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
211 #[teaql(entity = "Product", table = "product")]
212 struct TypedGraphProduct {
213 #[teaql(id)]
214 id: Option<u64>,
215 name: String,
216 }
217
218 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
219 #[teaql(entity = "OrderLine", table = "orderline")]
220 struct TypedGraphLine {
221 #[teaql(id)]
222 id: Option<u64>,
223 #[teaql(column = "order_id")]
224 order_id: Option<u64>,
225 name: String,
226 #[teaql(column = "product_id")]
227 product_id: Option<u64>,
228 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
229 product: Option<TypedGraphProduct>,
230 }
231
232 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
233 #[teaql(entity = "Order", table = "orders")]
234 struct TypedGraphOrder {
235 #[teaql(id)]
236 id: Option<u64>,
237 #[teaql(version)]
238 version: i64,
239 name: String,
240 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
241 lines: teaql_core::SmartList<TypedGraphLine>,
242 }
243
244 #[derive(Debug, PartialEq, Eq)]
245 struct OrderEntity {
246 id: u64,
247 version: i64,
248 name: String,
249 }
250
251 impl teaql_core::TeaqlEntity for OrderEntity {
252 fn entity_descriptor() -> EntityDescriptor {
253 entity()
254 }
255 }
256
257 impl Entity for OrderEntity {
258 fn from_record(record: Record) -> Result<Self, EntityError> {
259 let id = match record.get("id") {
260 Some(Value::U64(v)) => *v,
261 Some(Value::I64(v)) if *v >= 0 => *v as u64,
262 other => {
263 return Err(EntityError::new(
264 "Order",
265 format!("invalid id field: {other:?}"),
266 ));
267 }
268 };
269 let version = match record.get("version") {
270 Some(Value::I64(v)) => *v,
271 other => {
272 return Err(EntityError::new(
273 "Order",
274 format!("invalid version field: {other:?}"),
275 ));
276 }
277 };
278 let name = match record.get("name") {
279 Some(Value::Text(v)) => v.clone(),
280 other => {
281 return Err(EntityError::new(
282 "Order",
283 format!("invalid name field: {other:?}"),
284 ));
285 }
286 };
287 Ok(Self { id, version, name })
288 }
289
290 fn into_record(self) -> Record {
291 Record::from([
292 (String::from("id"), Value::U64(self.id)),
293 (String::from("version"), Value::I64(self.version)),
294 (String::from("name"), Value::Text(self.name)),
295 ])
296 }
297 }
298
299 #[derive(Debug)]
300 struct StubError;
301
302 impl std::fmt::Display for StubError {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 write!(f, "stub error")
305 }
306 }
307
308 impl std::error::Error for StubError {}
309
310 impl QueryExecutor for StubExecutor {
311 type Error = StubError;
312
313 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
314 Ok(self.rows.clone())
315 }
316
317 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
318 Ok(self.affected)
319 }
320
321 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
322 Ok(GraphTransactionBoundary::Started)
323 }
324 }
325
326 impl QueryExecutor for QueueExecutor {
327 type Error = StubError;
328
329 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
330 self.queries.lock().unwrap().push(query.sql.clone());
331 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
332 }
333
334 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
335 Ok(self.affected)
336 }
337 }
338
339 #[test]
340 fn user_context_records_configured_sql_logs() {
341 let mut ctx = UserContext::new()
342 .with_module(crate::module!(Order))
343 .with_sql_log_options(SqlLogOptions::select_only());
344 ctx.insert_resource(PostgresDialect);
345 ctx.insert_resource(StubExecutor {
346 affected: 1,
347 rows: Vec::new(),
348 });
349
350 {
351 let repo = ctx
352 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
353 .unwrap();
354 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
355 .unwrap();
356 repo.insert(&InsertCommand::new("Order").value("name", "created"))
357 .unwrap();
358 }
359
360 let logs = ctx.sql_logs();
361 assert_eq!(logs.len(), 1);
362 assert_eq!(logs[0].operation, SqlLogOperation::Select);
363 assert_eq!(
364 logs[0].debug_sql,
365 format!(
366 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
367 )
368 );
369
370 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
371 ctx.clear_sql_logs();
372 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
373 .unwrap()
374 .update(
375 &UpdateCommand::new("Order", 1_u64)
376 .value("name", "updated")
377 .expected_version(1),
378 )
379 .unwrap();
380
381 let logs = ctx.sql_logs();
382 assert_eq!(logs.len(), 1);
383 assert_eq!(logs[0].operation, SqlLogOperation::Update);
384 assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
385 assert!(logs[0].debug_sql.contains("'updated'"));
386 }
387
388 impl RepositoryBehavior for OrderBehavior {
389 fn before_select(
390 &self,
391 _ctx: &UserContext,
392 query: &mut teaql_core::SelectQuery,
393 ) -> Result<(), RuntimeError> {
394 query.filter = Some(Expr::eq("version", 1_i64));
395 Ok(())
396 }
397
398 fn before_insert(
399 &self,
400 _ctx: &UserContext,
401 command: &mut InsertCommand,
402 ) -> Result<(), RuntimeError> {
403 command
404 .values
405 .entry("version".to_owned())
406 .or_insert(Value::I64(1));
407 Ok(())
408 }
409
410 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
411 vec!["lines".to_owned()]
412 }
413 }
414
415 struct ContextAwareOrderBehavior;
416 struct OrderChecker;
417 #[derive(Clone)]
418 struct RecordingEventSink {
419 events: Arc<Mutex<Vec<EntityEvent>>>,
420 }
421
422 impl RepositoryBehavior for ContextAwareOrderBehavior {
423 fn before_insert(
424 &self,
425 ctx: &UserContext,
426 command: &mut InsertCommand,
427 ) -> Result<(), RuntimeError> {
428 let tenant = ctx
429 .get_named_resource::<String>("tenant")
430 .cloned()
431 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
432 let version = *ctx
433 .get_named_resource::<i64>("initial_version")
434 .ok_or_else(|| {
435 RuntimeError::Behavior("missing initial_version resource".to_owned())
436 })?;
437 let trace_id = match ctx.local("trace_id") {
438 Some(Value::Text(value)) => value.clone(),
439 other => {
440 return Err(RuntimeError::Behavior(format!(
441 "missing trace_id local, got {other:?}"
442 )));
443 }
444 };
445
446 command
447 .values
448 .entry("name".to_owned())
449 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
450 command
451 .values
452 .entry("version".to_owned())
453 .or_insert(Value::I64(version));
454 Ok(())
455 }
456 }
457
458 impl Checker for OrderChecker {
459 fn entity(&self) -> &str {
460 "Order"
461 }
462
463 fn check_and_fix(
464 &self,
465 _ctx: &UserContext,
466 record: &mut Record,
467 location: &ObjectLocation,
468 results: &mut CheckResults,
469 ) {
470 let status = CheckObjectStatus::from_record(record);
471 if status.is_create() {
472 self.required(record, "name", location, results);
473 record.entry("version".to_owned()).or_insert(Value::I64(1));
474 }
475 if status.is_update()
476 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
477 {
478 record.insert(
479 "name".to_owned(),
480 Value::Text("graph-update-checked".to_owned()),
481 );
482 }
483 self.min_string_length(record, "name", 3, location, results);
484 }
485 }
486
487 impl EntityEventSink for RecordingEventSink {
488 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
489 self.events.lock().unwrap().push(event.clone());
490 Ok(())
491 }
492 }
493
494 struct FixedIdGenerator(u64);
495
496 impl InternalIdGenerator for FixedIdGenerator {
497 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
498 Ok(self.0)
499 }
500 }
501
502 struct SequentialIdGenerator {
503 next: Mutex<u64>,
504 }
505
506 impl SequentialIdGenerator {
507 fn new(next: u64) -> Self {
508 Self {
509 next: Mutex::new(next),
510 }
511 }
512 }
513
514 impl InternalIdGenerator for SequentialIdGenerator {
515 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
516 let mut next = self
517 .next
518 .lock()
519 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
520 let id = *next;
521 *next += 1;
522 Ok(id)
523 }
524 }
525
526 #[test]
527 fn metadata_store_registers_entities() {
528 let store = InMemoryMetadataStore::new().with_entity(entity());
529 assert!(store.entity("Order").is_some());
530 }
531
532 #[test]
533 fn runtime_module_registers_descriptor_into_context() {
534 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
535 assert!(ctx.entity("Order").is_some());
536 assert!(ctx.has_repository("Order"));
537 }
538
539 #[test]
540 fn runtime_module_registers_derived_entity_and_behavior() {
541 let ctx = UserContext::new().with_module(
542 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
543 );
544 assert!(ctx.entity("CatalogProduct").is_some());
545 assert!(ctx.has_repository("CatalogProduct"));
546 assert!(ctx.repository_behavior("CatalogProduct").is_some());
547 }
548
549 #[test]
550 fn module_macro_registers_multiple_entities() {
551 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
552 assert!(ctx.entity("CatalogProduct").is_some());
553 assert!(ctx.has_repository("CatalogProduct"));
554 }
555
556 #[test]
557 fn module_macro_registers_entity_behavior_pairs() {
558 let ctx =
559 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
560 assert!(ctx.entity("CatalogProduct").is_some());
561 assert!(ctx.repository_behavior("CatalogProduct").is_some());
562 }
563
564 #[test]
565 fn repository_returns_optimistic_lock_conflict() {
566 let store = InMemoryMetadataStore::new().with_entity(entity());
567 let executor = StubExecutor {
568 affected: 0,
569 rows: Vec::new(),
570 };
571 let repo = Repository::new(&PostgresDialect, &store, &executor);
572
573 let err = repo
574 .update(
575 &UpdateCommand::new("Order", 1_u64)
576 .expected_version(3)
577 .value("name", "next"),
578 )
579 .unwrap_err();
580
581 match err {
582 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
583 other => panic!("unexpected error: {other}"),
584 }
585 }
586
587 #[test]
588 fn user_context_indexes_resources_and_locals() {
589 let mut ctx =
590 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
591 ctx.insert_resource::<u64>(42);
592 ctx.insert_named_resource("tenant", String::from("acme"));
593 ctx.put_local("trace_id", "req-1");
594
595 assert!(ctx.entity("Order").is_some());
596 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
597 assert_eq!(
598 ctx.get_named_resource::<String>("tenant"),
599 Some(&String::from("acme"))
600 );
601 assert_eq!(
602 ctx.local("trace_id"),
603 Some(&Value::Text("req-1".to_owned()))
604 );
605 }
606
607 #[test]
608 fn user_context_builds_context_repository() {
609 let mut ctx =
610 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
611 ctx.insert_resource(PostgresDialect);
612 ctx.insert_resource(StubExecutor {
613 affected: 1,
614 rows: Vec::new(),
615 });
616
617 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
618 let affected = repo
619 .update(
620 &UpdateCommand::new("Order", 1_u64)
621 .expected_version(3)
622 .value("name", "next"),
623 )
624 .unwrap();
625
626 assert_eq!(affected, 1);
627 }
628
629 #[test]
630 fn user_context_resolves_repository_by_entity_type() {
631 let mut ctx = UserContext::new()
632 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
633 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
634 ctx.insert_resource(PostgresDialect);
635 ctx.insert_resource(StubExecutor {
636 affected: 1,
637 rows: Vec::new(),
638 });
639
640 let repo = ctx
641 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
642 .unwrap();
643 assert_eq!(repo.entity(), "Order");
644 assert_eq!(repo.select().entity, "Order");
645
646 let affected = repo
647 .insert(
648 &repo
649 .insert_command()
650 .value("id", 1_u64)
651 .value("version", 1_i64)
652 .value("name", "n"),
653 )
654 .unwrap();
655 assert_eq!(affected, 1);
656 }
657
658 #[test]
659 fn resolved_repository_applies_behavior_hooks() {
660 let mut ctx = UserContext::new()
661 .with_metadata(
662 InMemoryMetadataStore::new()
663 .with_entity(entity())
664 .with_entity(line_entity())
665 .with_entity(product_entity()),
666 )
667 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
668 .with_repository_behavior_registry(
669 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
670 );
671 ctx.insert_resource(PostgresDialect);
672 ctx.insert_resource(StubExecutor {
673 affected: 1,
674 rows: Vec::new(),
675 });
676
677 let repo = ctx
678 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
679 .unwrap();
680
681 let compiled = repo.compile(&repo.select()).unwrap();
682 assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
683
684 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
685 let affected = repo.insert(&insert).unwrap();
686 assert_eq!(affected, 1);
687 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
688 }
689
690 #[test]
691 fn resolved_repository_prepares_insert_command_with_generated_id() {
692 let mut ctx = UserContext::new()
693 .with_metadata(
694 InMemoryMetadataStore::new()
695 .with_entity(entity())
696 .with_entity(line_entity())
697 .with_entity(product_entity()),
698 )
699 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
700 .with_repository_behavior_registry(
701 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
702 )
703 .with_internal_id_generator(FixedIdGenerator(42));
704 ctx.insert_resource(PostgresDialect);
705 ctx.insert_resource(StubExecutor {
706 affected: 1,
707 rows: Vec::new(),
708 });
709
710 let repo = ctx
711 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
712 .unwrap();
713
714 let prepared = repo
715 .prepare_insert_command(&repo.insert_command().value("name", "n"))
716 .unwrap();
717
718 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
719 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
720 assert_eq!(
721 prepared.values.get("name"),
722 Some(&Value::Text("n".to_owned()))
723 );
724 }
725
726 #[test]
727 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
728 let mut ctx = UserContext::new()
729 .with_metadata(
730 InMemoryMetadataStore::new()
731 .with_entity(entity())
732 .with_entity(line_entity())
733 .with_entity(product_entity()),
734 )
735 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
736 .with_repository_behavior_registry(
737 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
738 )
739 .with_internal_id_generator(SequentialIdGenerator::new(500));
740 ctx.insert_resource(PostgresDialect);
741 ctx.insert_resource(StubExecutor {
742 affected: 1,
743 rows: Vec::new(),
744 });
745
746 let repo = ctx
747 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
748 .unwrap();
749 let graph = GraphNode::new("Order").value("name", "root").relation(
750 "lines",
751 GraphNode::new("OrderLine")
752 .value("name", "line-1")
753 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
754 );
755
756 let saved = repo.save_graph(graph).unwrap();
757
758 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
759 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
760 let lines = saved.relations.get("lines").unwrap();
761 assert_eq!(lines.len(), 1);
762 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
763 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
764 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
765 let product = lines[0].relations.get("product").unwrap();
766 assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
767 }
768
769 #[test]
770 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
771 let mut ctx = UserContext::new()
772 .with_metadata(
773 InMemoryMetadataStore::new()
774 .with_entity(entity())
775 .with_entity(line_entity())
776 .with_entity(product_entity()),
777 )
778 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
779 .with_internal_id_generator(SequentialIdGenerator::new(700));
780 ctx.insert_resource(PostgresDialect);
781 ctx.insert_resource(StubExecutor {
782 affected: 1,
783 rows: Vec::new(),
784 });
785
786 let repo = ctx
787 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
788 .unwrap();
789 let order = TypedGraphOrder {
790 id: None,
791 version: 1,
792 name: "typed-root".to_owned(),
793 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
794 id: None,
795 order_id: None,
796 name: "typed-line".to_owned(),
797 product_id: None,
798 product: Some(TypedGraphProduct {
799 id: None,
800 name: "typed-product".to_owned(),
801 }),
802 }]),
803 };
804
805 let extracted = repo.graph_node_from_entity(order).unwrap();
806 assert_eq!(extracted.entity, "Order");
807 assert_eq!(
808 extracted.values.get("name"),
809 Some(&Value::Text("typed-root".to_owned()))
810 );
811 assert_eq!(extracted.values.get("id"), Some(&Value::Null));
812 assert_eq!(extracted.relations["lines"].len(), 1);
813 assert_eq!(
814 extracted.relations["lines"][0].values.get("name"),
815 Some(&Value::Text("typed-line".to_owned()))
816 );
817 assert_eq!(
818 extracted.relations["lines"][0].relations["product"].len(),
819 1
820 );
821
822 let saved = repo.save_graph(extracted).unwrap();
823 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
824 let lines = saved.relations.get("lines").unwrap();
825 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
826 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
827 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
828 assert_eq!(
829 lines[0].relations["product"][0].values.get("id"),
830 Some(&Value::U64(702))
831 );
832 }
833
834 #[test]
835 fn resolved_repository_saves_typed_entity_graph_directly() {
836 let mut ctx = UserContext::new()
837 .with_metadata(
838 InMemoryMetadataStore::new()
839 .with_entity(entity())
840 .with_entity(line_entity())
841 .with_entity(product_entity()),
842 )
843 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
844 .with_internal_id_generator(SequentialIdGenerator::new(800));
845 ctx.insert_resource(PostgresDialect);
846 ctx.insert_resource(StubExecutor {
847 affected: 1,
848 rows: Vec::new(),
849 });
850
851 let repo = ctx
852 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
853 .unwrap();
854 let saved = repo
855 .save_entity_graph(TypedGraphOrder {
856 id: None,
857 version: 1,
858 name: "typed-direct".to_owned(),
859 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
860 id: None,
861 order_id: None,
862 name: "typed-line".to_owned(),
863 product_id: None,
864 product: Some(TypedGraphProduct {
865 id: None,
866 name: "typed-product".to_owned(),
867 }),
868 }]),
869 })
870 .unwrap();
871
872 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
873 assert_eq!(
874 saved.relations["lines"][0].values.get("order_id"),
875 Some(&Value::U64(800))
876 );
877 assert_eq!(
878 saved.relations["lines"][0].values.get("product_id"),
879 Some(&Value::U64(802))
880 );
881 }
882
883 #[test]
884 fn custom_user_context_can_drive_insert_preparation() {
885 let mut ctx = UserContext::new()
886 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
887 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
888 .with_repository_behavior_registry(
889 InMemoryRepositoryBehaviorRegistry::new()
890 .with_behavior("Order", ContextAwareOrderBehavior),
891 )
892 .with_internal_id_generator(FixedIdGenerator(99));
893 ctx.insert_named_resource("tenant", String::from("acme"));
894 ctx.insert_named_resource("initial_version", 7_i64);
895 ctx.put_local("trace_id", "req-9");
896 ctx.insert_resource(PostgresDialect);
897 ctx.insert_resource(StubExecutor {
898 affected: 1,
899 rows: Vec::new(),
900 });
901
902 let repo = ctx
903 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
904 .unwrap();
905 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
906
907 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
908 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
909 assert_eq!(
910 prepared.values.get("name"),
911 Some(&Value::Text("acme:req-9".to_owned()))
912 );
913 }
914
915 #[test]
916 fn checker_registry_validates_and_fixes_insert_commands() {
917 let mut ctx = UserContext::new()
918 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
919 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
920 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
921 .with_internal_id_generator(FixedIdGenerator(77));
922 ctx.insert_resource(PostgresDialect);
923 ctx.insert_resource(StubExecutor {
924 affected: 1,
925 rows: Vec::new(),
926 });
927
928 let repo = ctx
929 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
930 .unwrap();
931 let prepared = repo
932 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
933 .unwrap();
934
935 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
936 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
937 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
938
939 let error = repo
940 .prepare_insert_command(&repo.insert_command().value("name", "no"))
941 .unwrap_err();
942 match error {
943 RuntimeError::Check(results) => {
944 assert_eq!(results.len(), 1);
945 assert_eq!(results[0].location.to_string(), "name");
946 }
947 other => panic!("unexpected checker error: {other:?}"),
948 }
949 }
950
951 #[test]
952 fn checker_registry_validates_update_commands_without_required_insert_checks() {
953 let mut ctx = UserContext::new()
954 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
955 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
956 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
957 ctx.insert_resource(PostgresDialect);
958 ctx.insert_resource(StubExecutor {
959 affected: 1,
960 rows: Vec::new(),
961 });
962
963 let repo = ctx
964 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
965 .unwrap();
966 repo.update(&repo.update_command(1_u64).value("version", 1_i64))
967 .unwrap();
968
969 let error = repo
970 .update(&repo.update_command(1_u64).value("name", "no"))
971 .unwrap_err();
972 match error {
973 RepositoryError::Runtime(RuntimeError::Check(results)) => {
974 assert_eq!(results.len(), 1);
975 assert_eq!(results[0].location.to_string(), "name");
976 }
977 other => panic!("unexpected checker error: {other:?}"),
978 }
979 }
980
981 #[test]
982 fn built_in_language_translators_cover_fifteen_languages() {
983 assert_eq!(Language::ALL.len(), 15);
984 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
985 let messages = Language::ALL
986 .iter()
987 .map(|language| translate_check_result(*language, &result))
988 .collect::<Vec<_>>();
989
990 assert!(messages.iter().all(|message| !message.is_empty()));
991 assert!(messages.iter().any(|message| message.contains("required")));
992 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
993 assert!(
994 messages
995 .iter()
996 .any(|message| message.contains("obligatoire"))
997 );
998 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
999 assert_eq!(
1000 Language::from_code("zh-TW"),
1001 Some(Language::TraditionalChinese)
1002 );
1003 }
1004
1005 #[test]
1006 fn user_context_language_switch_translates_checker_errors() {
1007 let mut ctx = UserContext::new()
1008 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1009 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1010 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1011 .with_internal_id_generator(FixedIdGenerator(77))
1012 .with_language(Language::Chinese);
1013 ctx.insert_resource(PostgresDialect);
1014 ctx.insert_resource(StubExecutor {
1015 affected: 1,
1016 rows: Vec::new(),
1017 });
1018
1019 let repo = ctx
1020 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1021 .unwrap();
1022 let error = repo
1023 .prepare_insert_command(&repo.insert_command())
1024 .unwrap_err();
1025 match error {
1026 RuntimeError::Check(results) => {
1027 assert_eq!(results.len(), 1);
1028 assert!(
1029 results[0]
1030 .message
1031 .as_ref()
1032 .is_some_and(|message| message.contains("å¿…å¡«"))
1033 );
1034 }
1035 other => panic!("unexpected checker error: {other:?}"),
1036 }
1037
1038 let mut ctx = UserContext::new().with_language(Language::English);
1039 ctx.set_language_code("es").unwrap();
1040 assert_eq!(ctx.language(), Language::Spanish);
1041 }
1042
1043 #[test]
1044 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1045 let mut ctx = UserContext::new()
1046 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1047 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1048 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1049 ctx.insert_resource(PostgresDialect);
1050 ctx.insert_resource(StubExecutor {
1051 affected: 1,
1052 rows: vec![Record::from([
1053 ("id".to_owned(), Value::U64(1)),
1054 ("version".to_owned(), Value::I64(1)),
1055 ("name".to_owned(), Value::Text("old".to_owned())),
1056 ])],
1057 });
1058
1059 let repo = ctx
1060 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1061 .unwrap();
1062 let saved = repo
1063 .save_graph(
1064 GraphNode::new("Order")
1065 .value("id", 1_u64)
1066 .value("version", 1_i64)
1067 .value("name", "graph-update"),
1068 )
1069 .unwrap();
1070
1071 assert_eq!(
1072 saved.values.get("name"),
1073 Some(&Value::Text("graph-update-checked".to_owned()))
1074 );
1075 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1076 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1077 }
1078
1079 #[test]
1080 fn user_context_event_sink_receives_repository_mutation_events() {
1081 let events = Arc::new(Mutex::new(Vec::new()));
1082 let mut ctx = UserContext::new()
1083 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1084 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1085 .with_internal_id_generator(FixedIdGenerator(88))
1086 .with_event_sink(RecordingEventSink {
1087 events: events.clone(),
1088 });
1089 ctx.insert_resource(PostgresDialect);
1090 ctx.insert_resource(StubExecutor {
1091 affected: 1,
1092 rows: Vec::new(),
1093 });
1094
1095 let repo = ctx
1096 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1097 .unwrap();
1098 repo.insert(&repo.insert_command().value("name", "created"))
1099 .unwrap();
1100 repo.update(
1101 &repo
1102 .update_command(88_u64)
1103 .expected_version(1)
1104 .value("name", "updated"),
1105 )
1106 .unwrap();
1107 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1108 .unwrap();
1109 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1110
1111 let events = events.lock().unwrap();
1112 assert_eq!(events.len(), 4);
1113 assert_eq!(events[0].kind, EntityEventKind::Created);
1114 assert_eq!(events[0].entity, "Order");
1115 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1116 assert_eq!(events[1].kind, EntityEventKind::Updated);
1117 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1118 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1119 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1120 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1121 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1122 }
1123
1124 #[test]
1125 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1126 let events = Arc::new(Mutex::new(Vec::new()));
1127 let mut ctx = UserContext::new()
1128 .with_metadata(
1129 InMemoryMetadataStore::new()
1130 .with_entity(entity())
1131 .with_entity(line_entity())
1132 .with_entity(product_entity()),
1133 )
1134 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1135 .with_event_sink(RecordingEventSink {
1136 events: events.clone(),
1137 });
1138 ctx.insert_resource(PostgresDialect);
1139 ctx.insert_resource(StubExecutor {
1140 affected: 1,
1141 rows: vec![Record::from([
1142 ("id".to_owned(), Value::U64(1)),
1143 ("version".to_owned(), Value::I64(1)),
1144 ("name".to_owned(), Value::Text("old".to_owned())),
1145 ])],
1146 });
1147
1148 let repo = ctx
1149 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1150 .unwrap();
1151 repo.save_graph(
1152 GraphNode::new("Order")
1153 .value("id", 1_u64)
1154 .value("version", 1_i64)
1155 .value("name", "updated")
1156 .relation(
1157 "lines",
1158 GraphNode::new("OrderLine")
1159 .value("name", "line")
1160 .value("product_id", 3_u64),
1161 ),
1162 )
1163 .unwrap();
1164
1165 let events = events.lock().unwrap();
1166 assert_eq!(events.len(), 3);
1167 assert_eq!(events[0].kind, EntityEventKind::Updated);
1168 assert_eq!(events[0].entity, "Order");
1169 assert_eq!(events[1].kind, EntityEventKind::Updated);
1170 assert_eq!(events[1].entity, "OrderLine");
1171 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1172 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1173 assert_eq!(events[2].entity, "OrderLine");
1174 }
1175
1176 #[test]
1177 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1178 let mut ctx = UserContext::new()
1179 .with_metadata(
1180 InMemoryMetadataStore::new()
1181 .with_entity(entity())
1182 .with_entity(line_entity())
1183 .with_entity(product_entity()),
1184 )
1185 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1186 .with_internal_id_generator(SequentialIdGenerator::new(500));
1187 ctx.insert_resource(PostgresDialect);
1188 ctx.insert_resource(StubExecutor {
1189 affected: 1,
1190 rows: vec![Record::from([
1191 ("id".to_owned(), Value::U64(1)),
1192 ("version".to_owned(), Value::I64(1)),
1193 ("name".to_owned(), Value::Text("old".to_owned())),
1194 ])],
1195 });
1196
1197 let plan = ctx
1198 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1199 GraphNode::new("Order")
1200 .value("id", 1_u64)
1201 .value("version", 1_i64)
1202 .value("name", "updated")
1203 .relation(
1204 "lines",
1205 GraphNode::new("OrderLine")
1206 .value("name", "new-line-a")
1207 .value("product_id", 2_u64),
1208 )
1209 .relation(
1210 "lines",
1211 GraphNode::new("OrderLine")
1212 .value("name", "new-line-b")
1213 .value("product_id", 2_u64),
1214 )
1215 .relation(
1216 "lines",
1217 GraphNode::new("OrderLine")
1218 .value("id", 5_u64)
1219 .value("version", 1_i64)
1220 .value("name", "same-update-a"),
1221 )
1222 .relation(
1223 "lines",
1224 GraphNode::new("OrderLine")
1225 .value("id", 6_u64)
1226 .value("version", 1_i64)
1227 .value("name", "same-update-b"),
1228 )
1229 .relation(
1230 "lines",
1231 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1232 )
1233 .relation(
1234 "lines",
1235 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1236 ),
1237 )
1238 .unwrap();
1239 let counts = plan.grouped_counts();
1240
1241 assert_eq!(
1242 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1243 Some(&1)
1244 );
1245 assert_eq!(
1246 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1247 Some(&2)
1248 );
1249 assert_eq!(
1250 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1251 Some(&2)
1252 );
1253 assert_eq!(
1254 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1255 Some(&1)
1256 );
1257 assert_eq!(
1258 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1259 Some(&1)
1260 );
1261 let create_batch = plan
1262 .batches
1263 .iter()
1264 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1265 .unwrap();
1266 assert_eq!(create_batch.items.len(), 2);
1267 assert_eq!(
1268 create_batch.items[0].values.get("id"),
1269 Some(&Value::U64(500))
1270 );
1271 assert_eq!(
1272 create_batch.items[1].values.get("id"),
1273 Some(&Value::U64(501))
1274 );
1275 let update_batch = plan
1276 .batches
1277 .iter()
1278 .find(|batch| {
1279 batch.entity == "OrderLine"
1280 && batch.kind == GraphMutationKind::Update
1281 && batch.update_fields == vec!["name".to_owned()]
1282 })
1283 .unwrap();
1284 assert_eq!(update_batch.items.len(), 2);
1285 }
1286
1287 #[test]
1288 fn resolved_repository_builds_relation_plans() {
1289 let mut ctx = UserContext::new()
1290 .with_metadata(
1291 InMemoryMetadataStore::new()
1292 .with_entity(entity())
1293 .with_entity(line_entity())
1294 .with_entity(product_entity()),
1295 )
1296 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297 .with_repository_behavior_registry(
1298 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1299 );
1300 ctx.insert_resource(PostgresDialect);
1301 ctx.insert_resource(StubExecutor {
1302 affected: 1,
1303 rows: Vec::new(),
1304 });
1305
1306 let repo = ctx
1307 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1308 .unwrap();
1309 let plans = repo.relation_plans().unwrap();
1310
1311 assert_eq!(plans.len(), 1);
1312 assert_eq!(plans[0].relation_name, "lines");
1313 assert_eq!(plans[0].target_entity, "OrderLine");
1314 assert_eq!(plans[0].local_key, "id");
1315 assert_eq!(plans[0].foreign_key, "order_id");
1316 assert!(plans[0].many);
1317 }
1318
1319 #[test]
1320 fn resolved_repository_builds_relation_query_from_parent_rows() {
1321 let mut ctx = UserContext::new()
1322 .with_metadata(
1323 InMemoryMetadataStore::new()
1324 .with_entity(entity())
1325 .with_entity(line_entity())
1326 .with_entity(product_entity()),
1327 )
1328 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1329 .with_repository_behavior_registry(
1330 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1331 );
1332 ctx.insert_resource(PostgresDialect);
1333 ctx.insert_resource(StubExecutor {
1334 affected: 1,
1335 rows: Vec::new(),
1336 });
1337
1338 let repo = ctx
1339 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1340 .unwrap();
1341 let parent_rows = vec![
1342 Record::from([(String::from("id"), Value::U64(11))]),
1343 Record::from([(String::from("id"), Value::U64(12))]),
1344 ];
1345
1346 let query = repo.relation_query("lines", &parent_rows).unwrap();
1347 let compiled = repo.compile(&query).unwrap();
1348 assert!(compiled.sql.contains("FROM \"orderline\""));
1349 assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1350 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1351 }
1352
1353 #[test]
1354 fn resolved_repository_enhances_parent_rows_with_relations() {
1355 let mut ctx = UserContext::new()
1356 .with_metadata(
1357 InMemoryMetadataStore::new()
1358 .with_entity(entity())
1359 .with_entity(line_entity())
1360 .with_entity(product_entity()),
1361 )
1362 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1363 .with_repository_behavior_registry(
1364 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1365 );
1366 ctx.insert_resource(PostgresDialect);
1367 ctx.insert_resource(StubExecutor {
1368 affected: 1,
1369 rows: vec![
1370 Record::from([
1371 (String::from("id"), Value::U64(101)),
1372 (String::from("order_id"), Value::U64(11)),
1373 (String::from("name"), Value::Text(String::from("l1"))),
1374 ]),
1375 Record::from([
1376 (String::from("id"), Value::U64(102)),
1377 (String::from("order_id"), Value::U64(11)),
1378 (String::from("name"), Value::Text(String::from("l2"))),
1379 ]),
1380 Record::from([
1381 (String::from("id"), Value::U64(201)),
1382 (String::from("order_id"), Value::U64(12)),
1383 (String::from("name"), Value::Text(String::from("l3"))),
1384 ]),
1385 ],
1386 });
1387
1388 let repo = ctx
1389 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1390 .unwrap();
1391 let mut parents = vec![
1392 Record::from([(String::from("id"), Value::U64(11))]),
1393 Record::from([(String::from("id"), Value::U64(12))]),
1394 ];
1395
1396 repo.enhance_relations(&mut parents).unwrap();
1397
1398 match parents[0].get("lines") {
1399 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1400 other => panic!("unexpected lines payload: {other:?}"),
1401 }
1402 match parents[1].get("lines") {
1403 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1404 other => panic!("unexpected lines payload: {other:?}"),
1405 }
1406 }
1407
1408 #[test]
1409 fn resolved_repository_fetches_smart_list_of_entities() {
1410 let mut ctx = UserContext::new()
1411 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1412 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1413 ctx.insert_resource(PostgresDialect);
1414 ctx.insert_resource(StubExecutor {
1415 affected: 1,
1416 rows: vec![Record::from([
1417 (String::from("id"), Value::U64(7)),
1418 (String::from("version"), Value::I64(2)),
1419 (String::from("name"), Value::Text(String::from("typed"))),
1420 ])],
1421 });
1422
1423 let repo = ctx
1424 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1425 .unwrap();
1426 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1427
1428 assert_eq!(rows.len(), 1);
1429 assert_eq!(
1430 rows.first(),
1431 Some(&OrderEntity {
1432 id: 7,
1433 version: 2,
1434 name: String::from("typed"),
1435 })
1436 );
1437 }
1438
1439 #[test]
1440 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1441 let mut ctx = UserContext::new()
1442 .with_metadata(
1443 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1444 )
1445 .with_repository_registry(
1446 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1447 );
1448 ctx.insert_resource(PostgresDialect);
1449 ctx.insert_resource(StubExecutor {
1450 affected: 1,
1451 rows: vec![Record::from([
1452 (String::from("id"), Value::U64(9)),
1453 (String::from("name"), Value::Text(String::from("derived"))),
1454 ])],
1455 });
1456
1457 let repo = ctx
1458 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1459 .unwrap();
1460 let rows = repo
1461 .fetch_entities::<CatalogProductRow>(&repo.select())
1462 .unwrap();
1463
1464 assert_eq!(rows.len(), 1);
1465 assert_eq!(
1466 rows.first(),
1467 Some(&CatalogProductRow {
1468 id: 9,
1469 name: String::from("derived"),
1470 })
1471 );
1472 }
1473
1474 #[test]
1475 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1476 let mut ctx = UserContext::new()
1477 .with_metadata(
1478 InMemoryMetadataStore::new()
1479 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1480 )
1481 .with_repository_registry(
1482 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1483 );
1484 ctx.insert_resource(PostgresDialect);
1485 ctx.insert_resource(StubExecutor {
1486 affected: 1,
1487 rows: vec![Record::from([
1488 (String::from("id"), Value::U64(1)),
1489 (String::from("lineCount"), Value::I64(3)),
1490 (String::from("amount"), Value::F64(18.5)),
1491 ])],
1492 });
1493
1494 let repo = ctx
1495 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1496 .unwrap();
1497 let rows = repo
1498 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1499 .unwrap();
1500
1501 assert_eq!(rows.len(), 1);
1502 assert_eq!(rows.data[0].id, 1);
1503 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1504 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1505 assert_eq!(
1506 rows.into_vec().into_iter().next().unwrap().into_json(),
1507 serde_json::json!({
1508 "id": 1,
1509 "lineCount": 3,
1510 "amount": 18.5
1511 })
1512 );
1513 }
1514
1515 #[test]
1516 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1517 let executor = QueueExecutor {
1518 affected: 1,
1519 rows: Mutex::new(VecDeque::from([
1520 vec![
1521 Record::from([
1522 (String::from("id"), Value::U64(1)),
1523 (String::from("version"), Value::I64(1)),
1524 (String::from("name"), Value::Text(String::from("first"))),
1525 ]),
1526 Record::from([
1527 (String::from("id"), Value::U64(2)),
1528 (String::from("version"), Value::I64(1)),
1529 (String::from("name"), Value::Text(String::from("second"))),
1530 ]),
1531 ],
1532 vec![Record::from([
1533 (String::from("order_id"), Value::U64(1)),
1534 (String::from("lineCount"), Value::I64(3)),
1535 ])],
1536 ])),
1537 queries: Mutex::new(Vec::new()),
1538 };
1539 let mut ctx = UserContext::new()
1540 .with_metadata(
1541 InMemoryMetadataStore::new()
1542 .with_entity(entity())
1543 .with_entity(line_entity()),
1544 )
1545 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1546 ctx.insert_resource(PostgresDialect);
1547 ctx.insert_resource(executor);
1548
1549 let repo = ctx
1550 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1551 .unwrap();
1552 let rows = repo
1553 .fetch_all_with_relation_aggregates(
1554 &repo
1555 .select()
1556 .project("id")
1557 .project("version")
1558 .project("name"),
1559 &[RelationAggregate::new(
1560 "lines",
1561 "lineCount",
1562 SelectQuery::new("OrderLine"),
1563 true,
1564 )],
1565 )
1566 .unwrap();
1567
1568 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1569 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1570
1571 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1572 let queries = executor.queries.lock().unwrap();
1573 assert_eq!(queries.len(), 2);
1574 assert_eq!(
1575 queries[1],
1576 "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1577 );
1578 }
1579
1580 #[test]
1581 fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1582 let mut line = line_entity();
1583 line.properties
1584 .iter_mut()
1585 .find(|property| property.name == "order_id")
1586 .unwrap()
1587 .column_name = "order_ref".to_owned();
1588 let executor = QueueExecutor {
1589 affected: 1,
1590 rows: Mutex::new(VecDeque::from([
1591 vec![Record::from([
1592 (String::from("id"), Value::U64(1)),
1593 (String::from("version"), Value::I64(1)),
1594 (String::from("name"), Value::Text(String::from("first"))),
1595 ])],
1596 vec![Record::from([
1597 (String::from("order_ref"), Value::I64(1)),
1598 (String::from("lineCount"), Value::I64(3)),
1599 ])],
1600 ])),
1601 queries: Mutex::new(Vec::new()),
1602 };
1603 let mut ctx = UserContext::new()
1604 .with_metadata(
1605 InMemoryMetadataStore::new()
1606 .with_entity(entity())
1607 .with_entity(line),
1608 )
1609 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1610 ctx.insert_resource(PostgresDialect);
1611 ctx.insert_resource(executor);
1612
1613 let repo = ctx
1614 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1615 .unwrap();
1616 let rows = repo
1617 .fetch_all_with_relation_aggregates(
1618 &repo
1619 .select()
1620 .project("id")
1621 .project("version")
1622 .project("name"),
1623 &[RelationAggregate::new(
1624 "lines",
1625 "lineCount",
1626 SelectQuery::new("OrderLine"),
1627 true,
1628 )],
1629 )
1630 .unwrap();
1631
1632 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1633 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1634 assert_eq!(
1635 executor.queries.lock().unwrap()[1],
1636 "SELECT \"order_ref\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_ref\" IN ($1)) GROUP BY \"order_ref\""
1637 );
1638 }
1639
1640 #[test]
1641 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1642 let executor = QueueExecutor {
1643 affected: 1,
1644 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1645 String::from("count"),
1646 Value::I64(2),
1647 )])]])),
1648 queries: Mutex::new(Vec::new()),
1649 };
1650 let mut ctx = UserContext::new()
1651 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1652 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1653 ctx.insert_resource(PostgresDialect);
1654 ctx.insert_resource(executor);
1655 ctx.insert_resource(InMemoryAggregationCache::default());
1656
1657 let repo = ctx
1658 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1659 .unwrap();
1660 let query = repo
1661 .select()
1662 .count("count")
1663 .enable_aggregation_cache_for(60_000);
1664
1665 let first = repo.fetch_all(&query).unwrap();
1666 let second = repo.fetch_all(&query).unwrap();
1667
1668 assert_eq!(first, second);
1669 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1670 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1671 }
1672
1673 #[test]
1674 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1675 let executor = QueueExecutor {
1676 affected: 1,
1677 rows: Mutex::new(VecDeque::from([
1678 vec![Record::from([(String::from("count"), Value::I64(2))])],
1679 vec![Record::from([(String::from("count"), Value::I64(3))])],
1680 ])),
1681 queries: Mutex::new(Vec::new()),
1682 };
1683 let mut ctx = UserContext::new()
1684 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1685 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1686 ctx.insert_resource(PostgresDialect);
1687 ctx.insert_resource(executor);
1688 ctx.insert_resource(
1689 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1690 as Arc<dyn AggregationCacheBackend>,
1691 );
1692
1693 let repo = ctx
1694 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1695 .unwrap();
1696 let query = repo
1697 .select()
1698 .count("count")
1699 .enable_aggregation_cache_for(60_000);
1700
1701 let first = repo.fetch_all(&query).unwrap();
1702 let cached = repo.fetch_all(&query).unwrap();
1703 repo.insert(
1704 &InsertCommand::new("Order")
1705 .value("id", 9_u64)
1706 .value("version", 1_i64)
1707 .value("name", "new"),
1708 )
1709 .unwrap();
1710 let refreshed = repo.fetch_all(&query).unwrap();
1711
1712 assert_eq!(first, cached);
1713 assert_ne!(cached, refreshed);
1714 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1715 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1716 }
1717
1718 #[test]
1719 fn aggregation_cache_propagates_to_relation_aggregates() {
1720 let parent_rows = vec![
1721 Record::from([
1722 (String::from("id"), Value::U64(1)),
1723 (String::from("version"), Value::I64(1)),
1724 (String::from("name"), Value::Text(String::from("first"))),
1725 ]),
1726 Record::from([
1727 (String::from("id"), Value::U64(2)),
1728 (String::from("version"), Value::I64(1)),
1729 (String::from("name"), Value::Text(String::from("second"))),
1730 ]),
1731 ];
1732 let aggregate_rows = vec![Record::from([
1733 (String::from("order_id"), Value::U64(1)),
1734 (String::from("lineCount"), Value::I64(3)),
1735 ])];
1736 let executor = QueueExecutor {
1737 affected: 1,
1738 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1739 queries: Mutex::new(Vec::new()),
1740 };
1741 let mut ctx = UserContext::new()
1742 .with_metadata(
1743 InMemoryMetadataStore::new()
1744 .with_entity(entity())
1745 .with_entity(line_entity()),
1746 )
1747 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1748 ctx.insert_resource(PostgresDialect);
1749 ctx.insert_resource(executor);
1750 ctx.insert_resource(InMemoryAggregationCache::default());
1751
1752 let repo = ctx
1753 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1754 .unwrap();
1755 let query = repo
1756 .select()
1757 .project("id")
1758 .project("version")
1759 .project("name")
1760 .enable_aggregation_cache_for(60_000)
1761 .propagate_aggregation_cache(60_000);
1762 let aggregate =
1763 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1764
1765 let first = repo
1766 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1767 .unwrap();
1768 let second = repo
1769 .fetch_all_with_relation_aggregates(&query, &[aggregate])
1770 .unwrap();
1771
1772 assert_eq!(first, second);
1773 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1774 assert_eq!(executor.queries.lock().unwrap().len(), 2);
1775 }
1776
1777 #[test]
1778 fn memory_repository_fetches_smart_list_entities_with_query_features() {
1779 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1780 let repository = MemoryRepository::new(metadata).with_rows(
1781 "Order",
1782 vec![
1783 Record::from([
1784 (String::from("id"), Value::U64(1)),
1785 (String::from("version"), Value::I64(1)),
1786 (String::from("name"), Value::Text(String::from("alpha"))),
1787 ]),
1788 Record::from([
1789 (String::from("id"), Value::U64(2)),
1790 (String::from("version"), Value::I64(1)),
1791 (String::from("name"), Value::Text(String::from("beta"))),
1792 ]),
1793 Record::from([
1794 (String::from("id"), Value::U64(3)),
1795 (String::from("version"), Value::I64(1)),
1796 (String::from("name"), Value::Text(String::from("gamma"))),
1797 ]),
1798 ],
1799 );
1800
1801 let query = teaql_core::SelectQuery::new("Order")
1802 .filter(Expr::Binary {
1803 left: Box::new(Expr::column("id")),
1804 op: teaql_core::BinaryOp::Gte,
1805 right: Box::new(Expr::value(2_u64)),
1806 })
1807 .order_by(OrderBy::desc("id"))
1808 .limit(1);
1809
1810 let orders = repository.fetch_entities::<Order>(&query).unwrap();
1811
1812 assert_eq!(orders.ids(), vec![Value::U64(3)]);
1813 assert_eq!(orders.versions(), vec![1]);
1814 assert_eq!(orders.first().unwrap().name, "gamma");
1815 }
1816
1817 #[test]
1818 fn memory_repository_runs_aggregates() {
1819 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1820 let repository = MemoryRepository::new(metadata).with_rows(
1821 "Order",
1822 vec![
1823 Record::from([
1824 (String::from("id"), Value::U64(1)),
1825 (String::from("version"), Value::I64(1)),
1826 (String::from("name"), Value::Text(String::from("alpha"))),
1827 ]),
1828 Record::from([
1829 (String::from("id"), Value::U64(2)),
1830 (String::from("version"), Value::I64(2)),
1831 (String::from("name"), Value::Text(String::from("beta"))),
1832 ]),
1833 ],
1834 );
1835
1836 let query = teaql_core::SelectQuery {
1837 entity: String::from("Order"),
1838 projection: Vec::new(),
1839 expr_projection: Vec::new(),
1840 filter: None,
1841 having: None,
1842 order_by: Vec::new(),
1843 slice: None,
1844 aggregates: vec![
1845 Aggregate {
1846 function: AggregateFunction::Count,
1847 field: String::from("id"),
1848 alias: String::from("count"),
1849 },
1850 Aggregate {
1851 function: AggregateFunction::Sum,
1852 field: String::from("version"),
1853 alias: String::from("versionSum"),
1854 },
1855 ],
1856 group_by: Vec::new(),
1857 relations: Vec::new(),
1858 aggregation_cache: None,
1859 comment: None,
1860 raw_sql: None,
1861 raw_sql_search_criteria: Vec::new(),
1862 json_expr: None,
1863 dynamic_properties: Vec::new(),
1864 raw_projections: Vec::new(),
1865 object_group_bys: Vec::new(),
1866 child_enhancements: Vec::new(),
1867 };
1868
1869 let rows = repository.fetch_all(&query).unwrap();
1870
1871 assert_eq!(rows.len(), 1);
1872 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1873 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1874 }
1875
1876 #[test]
1877 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1878 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1879 let repository = MemoryRepository::new(metadata).with_rows(
1880 "Order",
1881 vec![
1882 Record::from([
1883 (String::from("id"), Value::U64(1)),
1884 (String::from("version"), Value::I64(1)),
1885 (String::from("name"), Value::Text(String::from("alpha"))),
1886 ]),
1887 Record::from([
1888 (String::from("id"), Value::U64(2)),
1889 (String::from("version"), Value::I64(2)),
1890 (String::from("name"), Value::Text(String::from("alpha"))),
1891 ]),
1892 Record::from([
1893 (String::from("id"), Value::U64(3)),
1894 (String::from("version"), Value::I64(3)),
1895 (String::from("name"), Value::Text(String::from("tmp-beta"))),
1896 ]),
1897 ],
1898 );
1899
1900 let rows = repository
1901 .fetch_all(
1902 &teaql_core::SelectQuery::new("Order")
1903 .filter(
1904 Expr::between("version", 1_i64, 3_i64)
1905 .and_expr(Expr::not_like("name", "tmp%"))
1906 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1907 )
1908 .group_by("name")
1909 .count("total")
1910 .sum("version", "versionSum"),
1911 )
1912 .unwrap();
1913
1914 assert_eq!(rows.len(), 1);
1915 assert_eq!(
1916 rows[0].get("name"),
1917 Some(&Value::Text(String::from("alpha")))
1918 );
1919 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1920 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1921 }
1922
1923 #[test]
1924 fn memory_repository_runs_extended_aggregates_and_having() {
1925 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1926 let repository = MemoryRepository::new(metadata).with_rows(
1927 "Order",
1928 vec![
1929 Record::from([
1930 (String::from("id"), Value::U64(1)),
1931 (String::from("version"), Value::I64(1)),
1932 (String::from("name"), Value::Text(String::from("alpha"))),
1933 ]),
1934 Record::from([
1935 (String::from("id"), Value::U64(2)),
1936 (String::from("version"), Value::I64(3)),
1937 (String::from("name"), Value::Text(String::from("alpha"))),
1938 ]),
1939 Record::from([
1940 (String::from("id"), Value::U64(3)),
1941 (String::from("version"), Value::I64(7)),
1942 (String::from("name"), Value::Text(String::from("beta"))),
1943 ]),
1944 ],
1945 );
1946
1947 let rows = repository
1948 .fetch_all(
1949 &teaql_core::SelectQuery::new("Order")
1950 .group_by("name")
1951 .count("total")
1952 .stddev("version", "stddevVersion")
1953 .var_pop("version", "varPopVersion")
1954 .bit_or("version", "bitOrVersion")
1955 .having(Expr::gt("total", 1_i64)),
1956 )
1957 .unwrap();
1958
1959 assert_eq!(rows.len(), 1);
1960 assert_eq!(
1961 rows[0].get("name"),
1962 Some(&Value::Text(String::from("alpha")))
1963 );
1964 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1965 assert_eq!(
1966 rows[0].get("stddevVersion").map(Value::to_json_value),
1967 Some(serde_json::Value::String(
1968 "1.4142135623730951454746218583".to_owned()
1969 ))
1970 );
1971 assert_eq!(
1972 rows[0].get("varPopVersion"),
1973 Some(&Value::Decimal(Decimal::ONE))
1974 );
1975 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1976 }
1977
1978 #[test]
1979 fn memory_repository_runs_sound_like_filter() {
1980 let metadata = InMemoryMetadataStore::new().with_entity(entity());
1981 let repository = MemoryRepository::new(metadata).with_rows(
1982 "Order",
1983 vec![
1984 Record::from([
1985 (String::from("id"), Value::U64(1)),
1986 (String::from("version"), Value::I64(1)),
1987 (String::from("name"), Value::Text(String::from("Robert"))),
1988 ]),
1989 Record::from([
1990 (String::from("id"), Value::U64(2)),
1991 (String::from("version"), Value::I64(1)),
1992 (String::from("name"), Value::Text(String::from("Rupert"))),
1993 ]),
1994 Record::from([
1995 (String::from("id"), Value::U64(3)),
1996 (String::from("version"), Value::I64(1)),
1997 (String::from("name"), Value::Text(String::from("Ashcraft"))),
1998 ]),
1999 ],
2000 );
2001
2002 let rows = repository
2003 .fetch_all(
2004 &teaql_core::SelectQuery::new("Order")
2005 .filter(Expr::sound_like("name", "Robert"))
2006 .order_asc("id"),
2007 )
2008 .unwrap();
2009
2010 assert_eq!(rows.len(), 2);
2011 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2012 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2013 }
2014
2015 #[test]
2016 fn memory_repository_runs_java_style_string_match_filters() {
2017 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2018 let repository = MemoryRepository::new(metadata).with_rows(
2019 "Order",
2020 vec![
2021 Record::from([
2022 (String::from("id"), Value::U64(1)),
2023 (String::from("version"), Value::I64(1)),
2024 (String::from("name"), Value::Text(String::from("tea-order"))),
2025 ]),
2026 Record::from([
2027 (String::from("id"), Value::U64(2)),
2028 (String::from("version"), Value::I64(1)),
2029 (
2030 String::from("name"),
2031 Value::Text(String::from("coffee-order")),
2032 ),
2033 ]),
2034 Record::from([
2035 (String::from("id"), Value::U64(3)),
2036 (String::from("version"), Value::I64(1)),
2037 (
2038 String::from("name"),
2039 Value::Text(String::from("tea-archived")),
2040 ),
2041 ]),
2042 ],
2043 );
2044
2045 let rows = repository
2046 .fetch_all(
2047 &teaql_core::SelectQuery::new("Order")
2048 .filter(
2049 Expr::contain("name", "tea")
2050 .and_expr(Expr::begin_with("name", "tea"))
2051 .and_expr(Expr::end_with("name", "order"))
2052 .and_expr(Expr::not_contain("name", "coffee"))
2053 .and_expr(Expr::not_begin_with("name", "archived"))
2054 .and_expr(Expr::not_end_with("name", "draft")),
2055 )
2056 .order_asc("id"),
2057 )
2058 .unwrap();
2059
2060 assert_eq!(rows.len(), 1);
2061 assert_eq!(
2062 rows[0].get("name"),
2063 Some(&Value::Text("tea-order".to_owned()))
2064 );
2065 }
2066
2067 #[test]
2068 fn memory_repository_runs_property_to_property_filters() {
2069 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2070 let repository = MemoryRepository::new(metadata).with_rows(
2071 "Order",
2072 vec![
2073 Record::from([
2074 (String::from("id"), Value::U64(1)),
2075 (String::from("version"), Value::I64(2)),
2076 (String::from("name"), Value::Text(String::from("keep"))),
2077 ]),
2078 Record::from([
2079 (String::from("id"), Value::U64(2)),
2080 (String::from("version"), Value::I64(1)),
2081 (String::from("name"), Value::Text(String::from("skip"))),
2082 ]),
2083 ],
2084 );
2085
2086 let rows = repository
2087 .fetch_all(
2088 &teaql_core::SelectQuery::new("Order")
2089 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2090 .order_asc("id"),
2091 )
2092 .unwrap();
2093
2094 assert_eq!(rows.len(), 1);
2095 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2096 }
2097
2098 #[test]
2099 fn memory_repository_supports_mutations_and_optimistic_locking() {
2100 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2101 let repository = MemoryRepository::new(metadata);
2102
2103 repository
2104 .insert(
2105 &InsertCommand::new("Order")
2106 .value("id", 10_u64)
2107 .value("version", 1_i64)
2108 .value("name", "draft"),
2109 )
2110 .unwrap();
2111 repository
2112 .update(
2113 &UpdateCommand::new("Order", 10_u64)
2114 .expected_version(1)
2115 .value("name", "submitted"),
2116 )
2117 .unwrap();
2118
2119 let row = repository
2120 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2121 .unwrap()
2122 .pop()
2123 .unwrap();
2124 assert_eq!(
2125 row.get("name"),
2126 Some(&Value::Text(String::from("submitted")))
2127 );
2128 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2129
2130 let conflict = repository
2131 .update(
2132 &UpdateCommand::new("Order", 10_u64)
2133 .expected_version(1)
2134 .value("name", "stale"),
2135 )
2136 .unwrap_err();
2137 assert!(matches!(
2138 conflict,
2139 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2140 ));
2141
2142 repository
2143 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2144 .unwrap();
2145 let row = repository
2146 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2147 .unwrap()
2148 .pop()
2149 .unwrap();
2150 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2151
2152 repository
2153 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2154 .unwrap();
2155 let row = repository
2156 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2157 .unwrap()
2158 .pop()
2159 .unwrap();
2160 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2161 }
2162}
2163
2164#[cfg(all(test, feature = "sqlx"))]
2165mod sqlx_integration_tests {
2166 use super::sqlx_support::{
2167 MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2168 SqliteIdSpaceGenerator, SqliteMutationExecutor,
2169 };
2170 use super::{
2171 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2172 InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2173 RepositoryBehavior, UserContext,
2174 };
2175 use chrono::{NaiveDate, TimeZone, Utc};
2176 use teaql_core::{
2177 DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2178 PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2179 };
2180 use teaql_dialect_pg::PostgresDialect;
2181 use teaql_dialect_sqlite::SqliteDialect;
2182 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2183 use teaql_sql::SqlDialect;
2184 use tokio::runtime::Handle;
2185 use tokio::task::block_in_place;
2186
2187 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2188
2189 fn entity() -> EntityDescriptor {
2190 EntityDescriptor::new("Order")
2191 .table_name("orders")
2192 .property(
2193 PropertyDescriptor::new("id", DataType::U64)
2194 .column_name("id")
2195 .id()
2196 .not_null(),
2197 )
2198 .property(
2199 PropertyDescriptor::new("version", DataType::I64)
2200 .column_name("version")
2201 .version()
2202 .not_null(),
2203 )
2204 .property(
2205 PropertyDescriptor::new("name", DataType::Text)
2206 .column_name("name")
2207 .not_null(),
2208 )
2209 .relation(
2210 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2211 .local_key("id")
2212 .foreign_key("order_id")
2213 .many(),
2214 )
2215 }
2216
2217 fn sqlite_entity_keep_missing() -> EntityDescriptor {
2218 EntityDescriptor::new("Order")
2219 .table_name("orders")
2220 .property(
2221 PropertyDescriptor::new("id", DataType::U64)
2222 .column_name("id")
2223 .id()
2224 .not_null(),
2225 )
2226 .property(
2227 PropertyDescriptor::new("version", DataType::I64)
2228 .column_name("version")
2229 .version()
2230 .not_null(),
2231 )
2232 .property(
2233 PropertyDescriptor::new("name", DataType::Text)
2234 .column_name("name")
2235 .not_null(),
2236 )
2237 .relation(
2238 teaql_core::RelationDescriptor::new("lines", "OrderLine")
2239 .local_key("id")
2240 .foreign_key("order_id")
2241 .many()
2242 .keep_missing(),
2243 )
2244 }
2245
2246 fn line_entity() -> EntityDescriptor {
2247 EntityDescriptor::new("OrderLine")
2248 .table_name("orderline")
2249 .property(
2250 PropertyDescriptor::new("id", DataType::U64)
2251 .column_name("id")
2252 .id()
2253 .not_null(),
2254 )
2255 .property(
2256 PropertyDescriptor::new("version", DataType::I64)
2257 .column_name("version")
2258 .version(),
2259 )
2260 .property(
2261 PropertyDescriptor::new("order_id", DataType::U64)
2262 .column_name("order_id")
2263 .not_null(),
2264 )
2265 .property(
2266 PropertyDescriptor::new("name", DataType::Text)
2267 .column_name("name")
2268 .not_null(),
2269 )
2270 .property(
2271 PropertyDescriptor::new("product_id", DataType::U64)
2272 .column_name("product_id")
2273 .not_null(),
2274 )
2275 .relation(
2276 teaql_core::RelationDescriptor::new("product", "Product")
2277 .local_key("product_id")
2278 .foreign_key("id"),
2279 )
2280 }
2281
2282 fn product_entity() -> EntityDescriptor {
2283 EntityDescriptor::new("Product")
2284 .table_name("product")
2285 .property(
2286 PropertyDescriptor::new("id", DataType::U64)
2287 .column_name("id")
2288 .id()
2289 .not_null(),
2290 )
2291 .property(
2292 PropertyDescriptor::new("name", DataType::Text)
2293 .column_name("name")
2294 .not_null(),
2295 )
2296 }
2297
2298 fn typed_entity() -> EntityDescriptor {
2299 EntityDescriptor::new("TypedValue")
2300 .table_name("typed_value")
2301 .property(
2302 PropertyDescriptor::new("id", DataType::U64)
2303 .column_name("id")
2304 .id()
2305 .not_null(),
2306 )
2307 .property(
2308 PropertyDescriptor::new("payload", DataType::Json)
2309 .column_name("payload")
2310 .not_null(),
2311 )
2312 .property(
2313 PropertyDescriptor::new("amount", DataType::Decimal)
2314 .column_name("amount")
2315 .not_null(),
2316 )
2317 .property(
2318 PropertyDescriptor::new("birthday", DataType::Date)
2319 .column_name("birthday")
2320 .not_null(),
2321 )
2322 .property(
2323 PropertyDescriptor::new("happened_at", DataType::Timestamp)
2324 .column_name("happened_at")
2325 .not_null(),
2326 )
2327 }
2328
2329 struct OrderBehavior;
2330 struct NestedOrderBehavior;
2331
2332 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2333 #[teaql(entity = "Product", table = "product")]
2334 struct ProductEntityRow {
2335 #[teaql(id)]
2336 id: u64,
2337 name: String,
2338 }
2339
2340 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2341 #[teaql(entity = "OrderLine", table = "orderline")]
2342 struct OrderLineEntityRow {
2343 #[teaql(id)]
2344 id: u64,
2345 #[teaql(column = "order_id")]
2346 order_id: u64,
2347 name: String,
2348 #[teaql(column = "product_id")]
2349 product_id: u64,
2350 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2351 product: Option<ProductEntityRow>,
2352 }
2353
2354 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2355 #[teaql(entity = "Order", table = "orders")]
2356 struct OrderAggregateRow {
2357 #[teaql(id)]
2358 id: u64,
2359 #[teaql(version)]
2360 version: i64,
2361 name: String,
2362 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2363 lines: teaql_core::SmartList<OrderLineEntityRow>,
2364 }
2365
2366 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2367 #[teaql(entity = "Order", table = "orders")]
2368 struct Order {
2369 #[teaql(id)]
2370 id: u64,
2371 #[teaql(version)]
2372 version: i64,
2373 name: String,
2374 }
2375
2376 impl RepositoryBehavior for OrderBehavior {
2377 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2378 vec!["lines".to_owned()]
2379 }
2380 }
2381
2382 impl RepositoryBehavior for NestedOrderBehavior {
2383 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2384 vec!["lines.product".to_owned()]
2385 }
2386 }
2387
2388 #[derive(Clone)]
2389 struct SqliteSyncExecutor {
2390 inner: SqliteMutationExecutor,
2391 }
2392
2393 impl SqliteSyncExecutor {
2394 fn new(inner: SqliteMutationExecutor) -> Self {
2395 Self { inner }
2396 }
2397 }
2398
2399 impl QueryExecutor for SqliteSyncExecutor {
2400 type Error = MutationExecutorError;
2401
2402 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2403 let handle = Handle::current();
2404 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2405 }
2406
2407 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2408 let handle = Handle::current();
2409 block_in_place(|| handle.block_on(self.inner.execute(query)))
2410 }
2411
2412 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2413 let handle = Handle::current();
2414 block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2415 Ok(GraphTransactionBoundary::Started)
2416 }
2417
2418 fn commit_transaction(&self) -> Result<(), Self::Error> {
2419 let handle = Handle::current();
2420 block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2421 }
2422
2423 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2424 let handle = Handle::current();
2425 block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2426 }
2427 }
2428
2429 #[derive(Clone)]
2430 struct PgSyncExecutor {
2431 inner: PgMutationExecutor,
2432 }
2433
2434 impl PgSyncExecutor {
2435 fn new(inner: PgMutationExecutor) -> Self {
2436 Self { inner }
2437 }
2438 }
2439
2440 impl QueryExecutor for PgSyncExecutor {
2441 type Error = MutationExecutorError;
2442
2443 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2444 let handle = Handle::current();
2445 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2446 }
2447
2448 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2449 let handle = Handle::current();
2450 block_in_place(|| handle.block_on(self.inner.execute(query)))
2451 }
2452 }
2453
2454 #[derive(Clone)]
2455 struct PgTxSyncExecutor {
2456 inner: PgTransactionExecutor,
2457 }
2458
2459 impl PgTxSyncExecutor {
2460 fn new(inner: PgTransactionExecutor) -> Self {
2461 Self { inner }
2462 }
2463 }
2464
2465 impl QueryExecutor for PgTxSyncExecutor {
2466 type Error = MutationExecutorError;
2467
2468 fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2469 let handle = Handle::current();
2470 block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2471 }
2472
2473 fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2474 let handle = Handle::current();
2475 block_in_place(|| handle.block_on(self.inner.execute(query)))
2476 }
2477
2478 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2479 Ok(GraphTransactionBoundary::AlreadyActive)
2480 }
2481
2482 fn rollback_transaction(&self) -> Result<(), Self::Error> {
2483 let handle = Handle::current();
2484 block_in_place(|| handle.block_on(self.inner.rollback()))
2485 }
2486 }
2487
2488 #[tokio::test]
2489 async fn sqlite_executor_runs_crud_flow() {
2490 use sqlx::sqlite::SqlitePoolOptions;
2491
2492 let pool = SqlitePoolOptions::new()
2493 .max_connections(1)
2494 .connect("sqlite::memory:")
2495 .await
2496 .unwrap();
2497
2498 let executor = SqliteMutationExecutor::new(pool.clone());
2499 let dialect = SqliteDialect;
2500 let entity = entity();
2501 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2502
2503 let insert = dialect
2504 .compile_insert(
2505 &entity,
2506 &InsertCommand::new("Order")
2507 .value("id", 1_u64)
2508 .value("version", 1_i64)
2509 .value("name", "first"),
2510 )
2511 .unwrap();
2512 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2513
2514 let select = dialect
2515 .compile_select(
2516 &entity,
2517 &SelectQuery::new("Order")
2518 .project("id")
2519 .project("version")
2520 .project("name")
2521 .filter(Expr::eq("id", 1_u64)),
2522 )
2523 .unwrap();
2524 let rows = executor.fetch_all(&select).await.unwrap();
2525 assert_eq!(rows.len(), 1);
2526 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2527 assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2528 assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2529
2530 let update = dialect
2531 .compile_update(
2532 &entity,
2533 &UpdateCommand::new("Order", 1_u64)
2534 .expected_version(1)
2535 .value("name", "second"),
2536 )
2537 .unwrap();
2538 assert_eq!(executor.execute(&update).await.unwrap(), 1);
2539
2540 let after_update = executor.fetch_all(&select).await.unwrap();
2541 assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2542 assert_eq!(
2543 after_update[0].get("name"),
2544 Some(&Value::Text("second".to_owned()))
2545 );
2546
2547 let delete = dialect
2548 .compile_delete(
2549 &entity,
2550 &DeleteCommand::new("Order", 1_u64).expected_version(2),
2551 )
2552 .unwrap();
2553 assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2554
2555 let after_delete = executor.fetch_all(&select).await.unwrap();
2556 assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2557
2558 let recover = dialect
2559 .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2560 .unwrap();
2561 assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2562
2563 let after_recover = executor.fetch_all(&select).await.unwrap();
2564 assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2565 }
2566
2567 #[tokio::test(flavor = "multi_thread")]
2568 async fn sqlite_executor_enhances_relations() {
2569 use sqlx::sqlite::SqlitePoolOptions;
2570
2571 let pool = SqlitePoolOptions::new()
2572 .max_connections(1)
2573 .connect("sqlite::memory:")
2574 .await
2575 .unwrap();
2576
2577 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2578 let order = entity();
2579 let line = line_entity();
2580 mutation_executor
2581 .ensure_schema(&SqliteDialect, &[&order, &line])
2582 .await
2583 .unwrap();
2584
2585 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2586 .execute(&pool)
2587 .await
2588 .unwrap();
2589 sqlx::query(
2590 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2591 (101, 1, 1001, 'l1'),
2592 (102, 1, 1002, 'l2'),
2593 (201, 2, 1003, 'l3')",
2594 )
2595 .execute(&pool)
2596 .await
2597 .unwrap();
2598
2599 let executor = SqliteSyncExecutor::new(mutation_executor);
2600 let mut ctx = UserContext::new()
2601 .with_metadata(
2602 InMemoryMetadataStore::new()
2603 .with_entity(order)
2604 .with_entity(line)
2605 .with_entity(product_entity()),
2606 )
2607 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2608 .with_repository_behavior_registry(
2609 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2610 );
2611 ctx.insert_resource(SqliteDialect);
2612 ctx.insert_resource(executor);
2613
2614 let repo = ctx
2615 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2616 .unwrap();
2617 let mut parents = repo
2618 .fetch_all(
2619 &repo
2620 .select()
2621 .project("id")
2622 .project("version")
2623 .project("name")
2624 .order_by(teaql_core::OrderBy::asc("id")),
2625 )
2626 .unwrap();
2627
2628 repo.enhance_relations(&mut parents).unwrap();
2629
2630 assert_eq!(parents.len(), 2);
2631 match parents[0].get("lines") {
2632 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2633 other => panic!("unexpected first lines payload: {other:?}"),
2634 }
2635 match parents[1].get("lines") {
2636 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2637 other => panic!("unexpected second lines payload: {other:?}"),
2638 }
2639 }
2640
2641 #[tokio::test(flavor = "multi_thread")]
2642 async fn sqlite_executor_enhances_nested_relations() {
2643 use sqlx::sqlite::SqlitePoolOptions;
2644
2645 let pool = SqlitePoolOptions::new()
2646 .max_connections(1)
2647 .connect("sqlite::memory:")
2648 .await
2649 .unwrap();
2650
2651 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2652 let order = entity();
2653 let line = line_entity();
2654 let product = product_entity();
2655 mutation_executor
2656 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2657 .await
2658 .unwrap();
2659
2660 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2661 .execute(&pool)
2662 .await
2663 .unwrap();
2664 sqlx::query(
2665 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2666 (101, 1, 1001, 'l1'),
2667 (102, 1, 1002, 'l2')",
2668 )
2669 .execute(&pool)
2670 .await
2671 .unwrap();
2672 sqlx::query(
2673 "INSERT INTO product (id, name) VALUES
2674 (1001, 'p1'),
2675 (1002, 'p2')",
2676 )
2677 .execute(&pool)
2678 .await
2679 .unwrap();
2680
2681 let executor = SqliteSyncExecutor::new(mutation_executor);
2682 let mut ctx = UserContext::new()
2683 .with_metadata(
2684 InMemoryMetadataStore::new()
2685 .with_entity(order)
2686 .with_entity(line)
2687 .with_entity(product),
2688 )
2689 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2690 .with_repository_behavior_registry(
2691 InMemoryRepositoryBehaviorRegistry::new()
2692 .with_behavior("Order", NestedOrderBehavior),
2693 );
2694 ctx.insert_resource(SqliteDialect);
2695 ctx.insert_resource(executor);
2696
2697 let repo = ctx
2698 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2699 .unwrap();
2700 let mut parents = repo
2701 .fetch_all(
2702 &repo
2703 .select()
2704 .project("id")
2705 .project("version")
2706 .project("name"),
2707 )
2708 .unwrap();
2709
2710 repo.enhance_relations(&mut parents).unwrap();
2711
2712 match parents[0].get("lines") {
2713 Some(Value::List(lines)) => {
2714 assert_eq!(lines.len(), 2);
2715 for line in lines {
2716 match line {
2717 Value::Object(line_record) => match line_record.get("product") {
2718 Some(Value::Object(product)) => {
2719 assert!(product.get("name").is_some());
2720 }
2721 other => panic!("unexpected product payload: {other:?}"),
2722 },
2723 other => panic!("unexpected line payload: {other:?}"),
2724 }
2725 }
2726 }
2727 other => panic!("unexpected nested lines payload: {other:?}"),
2728 }
2729 }
2730
2731 #[tokio::test(flavor = "multi_thread")]
2732 async fn sqlite_executor_enhances_query_relation_with_child_query() {
2733 use sqlx::sqlite::SqlitePoolOptions;
2734
2735 let pool = SqlitePoolOptions::new()
2736 .max_connections(1)
2737 .connect("sqlite::memory:")
2738 .await
2739 .unwrap();
2740
2741 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2742 let order = entity();
2743 let line = line_entity();
2744 let product = product_entity();
2745 mutation_executor
2746 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2747 .await
2748 .unwrap();
2749
2750 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2751 .execute(&pool)
2752 .await
2753 .unwrap();
2754 sqlx::query(
2755 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2756 (101, 1, 1001, 'keep'),
2757 (102, 1, 1002, 'drop')",
2758 )
2759 .execute(&pool)
2760 .await
2761 .unwrap();
2762 sqlx::query(
2763 "INSERT INTO product (id, name) VALUES
2764 (1001, 'p1'),
2765 (1002, 'p2')",
2766 )
2767 .execute(&pool)
2768 .await
2769 .unwrap();
2770
2771 let executor = SqliteSyncExecutor::new(mutation_executor);
2772 let mut ctx = UserContext::new()
2773 .with_metadata(
2774 InMemoryMetadataStore::new()
2775 .with_entity(order)
2776 .with_entity(line)
2777 .with_entity(product),
2778 )
2779 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2780 ctx.insert_resource(SqliteDialect);
2781 ctx.insert_resource(executor);
2782
2783 let repo = ctx
2784 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2785 .unwrap();
2786 let query = repo.select().relation_query(
2787 "lines",
2788 SelectQuery::new("OrderLine")
2789 .project("name")
2790 .filter(Expr::eq("name", "keep"))
2791 .order_desc("id")
2792 .page(0, 10)
2793 .relation_query("product", SelectQuery::new("Product").project("name")),
2794 );
2795 let mut parents = repo
2796 .fetch_all(
2797 &repo
2798 .select()
2799 .project("id")
2800 .project("version")
2801 .project("name"),
2802 )
2803 .unwrap();
2804
2805 repo.enhance_query_relations(&mut parents, &query).unwrap();
2806
2807 match parents[0].get("lines") {
2808 Some(Value::List(lines)) => {
2809 assert_eq!(lines.len(), 1);
2810 let Value::Object(line) = &lines[0] else {
2811 panic!("unexpected line payload: {:?}", lines[0]);
2812 };
2813 assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2814 assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2815 assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2816 match line.get("product") {
2817 Some(Value::Object(product)) => {
2818 assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2819 assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2820 }
2821 other => panic!("unexpected product payload: {other:?}"),
2822 }
2823 }
2824 other => panic!("unexpected query relation payload: {other:?}"),
2825 }
2826 }
2827
2828 #[tokio::test]
2829 async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2830 use sqlx::Row;
2831 use sqlx::sqlite::SqlitePoolOptions;
2832
2833 let pool = SqlitePoolOptions::new()
2834 .max_connections(1)
2835 .connect("sqlite::memory:")
2836 .await
2837 .unwrap();
2838
2839 sqlx::query(
2840 "CREATE TABLE orders (
2841 id INTEGER PRIMARY KEY
2842 )",
2843 )
2844 .execute(&pool)
2845 .await
2846 .unwrap();
2847
2848 let executor = SqliteMutationExecutor::new(pool.clone());
2849 let dialect = SqliteDialect;
2850 let entity = entity();
2851
2852 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2853
2854 let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2855 .fetch_all(&pool)
2856 .await
2857 .unwrap()
2858 .into_iter()
2859 .map(|row| row.try_get::<String, _>("name").unwrap())
2860 .collect::<Vec<_>>();
2861
2862 assert!(columns.contains(&"id".to_owned()));
2863 assert!(columns.contains(&"version".to_owned()));
2864 assert!(columns.contains(&"name".to_owned()));
2865 }
2866
2867 #[tokio::test]
2868 async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2869 use sqlx::Row;
2870 use sqlx::sqlite::SqlitePoolOptions;
2871
2872 let pool = SqlitePoolOptions::new()
2873 .max_connections(1)
2874 .connect("sqlite::memory:")
2875 .await
2876 .unwrap();
2877
2878 let module = super::RuntimeModule::new()
2879 .descriptor(entity())
2880 .descriptor(line_entity())
2881 .descriptor(product_entity())
2882 .initial_graph(
2883 GraphNode::new("Order")
2884 .value("id", 1_u64)
2885 .value("version", 1_i64)
2886 .value("name", "seed-order"),
2887 );
2888 let mut ctx = super::UserContext::new().with_module(module);
2889 ctx.insert_resource(SqliteDialect);
2890 ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2891
2892 ctx.ensure_sqlite_schema().await.unwrap();
2893 sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2894 .execute(&pool)
2895 .await
2896 .unwrap();
2897 ctx.ensure_sqlite_schema().await.unwrap();
2898
2899 let tables = sqlx::query(
2900 "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2901 )
2902 .fetch_all(&pool)
2903 .await
2904 .unwrap()
2905 .into_iter()
2906 .map(|row| row.try_get::<String, _>("name").unwrap())
2907 .collect::<Vec<_>>();
2908
2909 assert_eq!(
2910 tables,
2911 vec![
2912 "orderline".to_owned(),
2913 "orders".to_owned(),
2914 "product".to_owned()
2915 ]
2916 );
2917
2918 let seed_count: i64 =
2919 sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2920 .fetch_one(&pool)
2921 .await
2922 .unwrap();
2923 assert_eq!(seed_count, 1);
2924 }
2925
2926 #[tokio::test]
2927 async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2928 use sqlx::sqlite::SqlitePoolOptions;
2929
2930 let pool = SqlitePoolOptions::new()
2931 .max_connections(1)
2932 .connect("sqlite::memory:")
2933 .await
2934 .unwrap();
2935
2936 let executor = SqliteMutationExecutor::new(pool.clone());
2937 let dialect = SqliteDialect;
2938 let entity = typed_entity();
2939 executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2940
2941 let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2942 let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2943 let payload = serde_json::json!({"name": "teaql", "count": 2});
2944 let amount = Decimal::new(12345, 2);
2945
2946 let insert = dialect
2947 .compile_insert(
2948 &entity,
2949 &InsertCommand::new("TypedValue")
2950 .value("id", 1_u64)
2951 .value("payload", payload.clone())
2952 .value("amount", amount)
2953 .value("birthday", birthday)
2954 .value("happened_at", happened_at),
2955 )
2956 .unwrap();
2957 assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2958
2959 let select = dialect
2960 .compile_select(
2961 &entity,
2962 &SelectQuery::new("TypedValue")
2963 .project("id")
2964 .project("payload")
2965 .project("amount")
2966 .project("birthday")
2967 .project("happened_at")
2968 .filter(Expr::eq("id", 1_u64)),
2969 )
2970 .unwrap();
2971 let rows = executor.fetch_all(&select).await.unwrap();
2972 assert_eq!(rows.len(), 1);
2973 assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2974 assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2975 assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2976 assert_eq!(
2977 rows[0].get("happened_at"),
2978 Some(&Value::Timestamp(happened_at))
2979 );
2980 }
2981
2982 #[tokio::test(flavor = "multi_thread")]
2983 async fn sqlite_fetches_enhanced_typed_entities() {
2984 use sqlx::sqlite::SqlitePoolOptions;
2985
2986 let pool = SqlitePoolOptions::new()
2987 .max_connections(1)
2988 .connect("sqlite::memory:")
2989 .await
2990 .unwrap();
2991
2992 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2993 let order = entity();
2994 let line = line_entity();
2995 let product = product_entity();
2996 mutation_executor
2997 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2998 .await
2999 .unwrap();
3000
3001 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
3002 .execute(&pool)
3003 .await
3004 .unwrap();
3005 sqlx::query(
3006 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3007 (101, 1, 1001, 'l1'),
3008 (102, 1, 1002, 'l2')",
3009 )
3010 .execute(&pool)
3011 .await
3012 .unwrap();
3013 sqlx::query(
3014 "INSERT INTO product (id, name) VALUES
3015 (1001, 'p1'),
3016 (1002, 'p2')",
3017 )
3018 .execute(&pool)
3019 .await
3020 .unwrap();
3021
3022 let executor = SqliteSyncExecutor::new(mutation_executor);
3023 let mut ctx = UserContext::new()
3024 .with_metadata(
3025 InMemoryMetadataStore::new()
3026 .with_entity(order)
3027 .with_entity(line)
3028 .with_entity(product),
3029 )
3030 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3031 .with_repository_behavior_registry(
3032 InMemoryRepositoryBehaviorRegistry::new()
3033 .with_behavior("Order", NestedOrderBehavior),
3034 );
3035 ctx.insert_resource(SqliteDialect);
3036 ctx.insert_resource(executor);
3037
3038 let repo = ctx
3039 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3040 .unwrap();
3041 let rows = repo
3042 .fetch_enhanced_entities::<OrderAggregateRow>(
3043 &repo
3044 .select()
3045 .project("id")
3046 .project("version")
3047 .project("name"),
3048 )
3049 .unwrap();
3050
3051 assert_eq!(rows.len(), 1);
3052 let order = rows.first().unwrap();
3053 assert_eq!(order.id, 1);
3054 assert_eq!(order.lines.len(), 2);
3055 assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
3056 assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
3057 }
3058
3059 #[tokio::test(flavor = "multi_thread")]
3060 async fn sqlite_fetches_typed_smart_list_entities() {
3061 use sqlx::sqlite::SqlitePoolOptions;
3062
3063 let pool = SqlitePoolOptions::new()
3064 .max_connections(1)
3065 .connect("sqlite::memory:")
3066 .await
3067 .unwrap();
3068
3069 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3070 let order = entity();
3071 mutation_executor
3072 .ensure_schema(&SqliteDialect, &[&order])
3073 .await
3074 .unwrap();
3075
3076 sqlx::query(
3077 "INSERT INTO orders (id, version, name) VALUES
3078 (1, 1, 'o1'),
3079 (2, 3, 'o2')",
3080 )
3081 .execute(&pool)
3082 .await
3083 .unwrap();
3084
3085 let executor = SqliteSyncExecutor::new(mutation_executor);
3086 let mut ctx = UserContext::new()
3087 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3088 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3089 ctx.insert_resource(SqliteDialect);
3090 ctx.insert_resource(executor);
3091
3092 let repo = ctx
3093 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3094 .unwrap();
3095 let rows = repo
3096 .fetch_entities::<OrderAggregateRow>(
3097 &repo
3098 .select()
3099 .project("id")
3100 .project("version")
3101 .project("name"),
3102 )
3103 .unwrap();
3104
3105 assert_eq!(rows.len(), 2);
3106 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3107 assert_eq!(rows.versions(), vec![1, 3]);
3108 assert!(rows.data[0].lines.is_empty());
3109 assert!(rows.data[1].lines.is_empty());
3110 }
3111
3112 #[tokio::test(flavor = "multi_thread")]
3113 async fn sqlite_fetches_smart_list_of_order_entities() {
3114 use sqlx::sqlite::SqlitePoolOptions;
3115
3116 let pool = SqlitePoolOptions::new()
3117 .max_connections(1)
3118 .connect("sqlite::memory:")
3119 .await
3120 .unwrap();
3121
3122 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3123 let order = entity();
3124 mutation_executor
3125 .ensure_schema(&SqliteDialect, &[&order])
3126 .await
3127 .unwrap();
3128
3129 sqlx::query(
3130 "INSERT INTO orders (id, version, name) VALUES
3131 (1, 1, 'o1'),
3132 (2, 3, 'o2')",
3133 )
3134 .execute(&pool)
3135 .await
3136 .unwrap();
3137
3138 let executor = SqliteSyncExecutor::new(mutation_executor);
3139 let mut ctx = UserContext::new()
3140 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3141 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3142 ctx.insert_resource(SqliteDialect);
3143 ctx.insert_resource(executor);
3144
3145 let repo = ctx
3146 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3147 .unwrap();
3148 let rows = repo
3149 .fetch_entities::<Order>(
3150 &repo
3151 .select()
3152 .project("id")
3153 .project("version")
3154 .project("name"),
3155 )
3156 .unwrap();
3157
3158 assert_eq!(
3159 rows.data,
3160 vec![
3161 Order {
3162 id: 1,
3163 version: 1,
3164 name: "o1".to_owned(),
3165 },
3166 Order {
3167 id: 2,
3168 version: 3,
3169 name: "o2".to_owned(),
3170 }
3171 ]
3172 );
3173 assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3174 assert_eq!(rows.versions(), vec![1, 3]);
3175 }
3176
3177 #[tokio::test(flavor = "multi_thread")]
3178 async fn sqlite_insert_generates_missing_id() {
3179 use sqlx::sqlite::SqlitePoolOptions;
3180
3181 let pool = SqlitePoolOptions::new()
3182 .max_connections(1)
3183 .connect("sqlite::memory:")
3184 .await
3185 .unwrap();
3186
3187 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3188 let order = entity();
3189 mutation_executor
3190 .ensure_schema(&SqliteDialect, &[&order])
3191 .await
3192 .unwrap();
3193
3194 let executor = SqliteSyncExecutor::new(mutation_executor);
3195 let mut ctx = UserContext::new()
3196 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3197 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3198 ctx.insert_resource(SqliteDialect);
3199 ctx.insert_resource(executor);
3200
3201 let repo = ctx
3202 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3203 .unwrap();
3204 let affected = repo
3205 .insert(
3206 &repo
3207 .insert_command()
3208 .value("version", 1_i64)
3209 .value("name", "generated"),
3210 )
3211 .unwrap();
3212 assert_eq!(affected, 1);
3213
3214 let rows = repo
3215 .fetch_entities::<Order>(
3216 &repo
3217 .select()
3218 .project("id")
3219 .project("version")
3220 .project("name")
3221 .filter(Expr::eq("name", "generated")),
3222 )
3223 .unwrap();
3224 assert_eq!(rows.len(), 1);
3225 let order = rows.first().unwrap();
3226 assert!(order.id > 0);
3227 assert_eq!(order.version, 1);
3228 assert_eq!(order.name, "generated");
3229 }
3230
3231 #[tokio::test(flavor = "multi_thread")]
3232 async fn sqlite_save_graph_inserts_nested_rows() {
3233 use sqlx::{Row, sqlite::SqlitePoolOptions};
3234
3235 let pool = SqlitePoolOptions::new()
3236 .max_connections(1)
3237 .connect("sqlite::memory:?cache=shared")
3238 .await
3239 .unwrap();
3240
3241 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3242 let order = entity();
3243 let line = line_entity();
3244 let product = product_entity();
3245 mutation_executor
3246 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3247 .await
3248 .unwrap();
3249
3250 let executor = SqliteSyncExecutor::new(mutation_executor);
3251 let mut ctx = UserContext::new()
3252 .with_metadata(
3253 InMemoryMetadataStore::new()
3254 .with_entity(order)
3255 .with_entity(line)
3256 .with_entity(product),
3257 )
3258 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3259 ctx.insert_resource(SqliteDialect);
3260 ctx.insert_resource(executor);
3261
3262 let repo = ctx
3263 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3264 .unwrap();
3265 let graph = GraphNode::new("Order")
3266 .value("id", 1_u64)
3267 .value("version", 1_i64)
3268 .value("name", "root")
3269 .relation(
3270 "lines",
3271 GraphNode::new("OrderLine")
3272 .value("id", 10_u64)
3273 .value("name", "line-1")
3274 .relation(
3275 "product",
3276 GraphNode::new("Product")
3277 .value("id", 100_u64)
3278 .value("name", "sku-1"),
3279 ),
3280 );
3281
3282 let saved = repo.save_graph(graph).unwrap();
3283 assert_eq!(
3284 saved.relations["lines"][0].values.get("order_id"),
3285 Some(&Value::U64(1))
3286 );
3287 assert_eq!(
3288 saved.relations["lines"][0].values.get("product_id"),
3289 Some(&Value::U64(100))
3290 );
3291
3292 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3293 .fetch_one(&pool)
3294 .await
3295 .unwrap()
3296 .try_get("count")
3297 .unwrap();
3298 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3299 .fetch_one(&pool)
3300 .await
3301 .unwrap();
3302 let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3303 .fetch_one(&pool)
3304 .await
3305 .unwrap()
3306 .try_get("count")
3307 .unwrap();
3308
3309 assert_eq!(order_count, 1);
3310 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3311 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3312 assert_eq!(product_count, 1);
3313 }
3314
3315 #[tokio::test(flavor = "multi_thread")]
3316 async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3317 use sqlx::{Row, sqlite::SqlitePoolOptions};
3318
3319 let pool = SqlitePoolOptions::new()
3320 .max_connections(1)
3321 .connect("sqlite::memory:")
3322 .await
3323 .unwrap();
3324
3325 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3326 let order = entity();
3327 let line = line_entity();
3328 let product = product_entity();
3329 mutation_executor
3330 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3331 .await
3332 .unwrap();
3333
3334 let executor = SqliteSyncExecutor::new(mutation_executor);
3335 let mut ctx = UserContext::new()
3336 .with_metadata(
3337 InMemoryMetadataStore::new()
3338 .with_entity(order)
3339 .with_entity(line)
3340 .with_entity(product),
3341 )
3342 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3343 ctx.insert_resource(SqliteDialect);
3344 ctx.insert_resource(executor);
3345
3346 let repo = ctx
3347 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3348 .unwrap();
3349 let saved = repo
3350 .save_entity_graph(OrderAggregateRow {
3351 id: 2,
3352 version: 1,
3353 name: "typed-root".to_owned(),
3354 lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3355 id: 20,
3356 order_id: 0,
3357 name: "typed-line".to_owned(),
3358 product_id: 200,
3359 product: Some(ProductEntityRow {
3360 id: 200,
3361 name: "typed-sku".to_owned(),
3362 }),
3363 }]),
3364 })
3365 .unwrap();
3366
3367 assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3368 assert_eq!(
3369 saved.relations["lines"][0].values.get("order_id"),
3370 Some(&Value::U64(2))
3371 );
3372 assert_eq!(
3373 saved.relations["lines"][0].values.get("product_id"),
3374 Some(&Value::U64(200))
3375 );
3376
3377 let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3378 .fetch_one(&pool)
3379 .await
3380 .unwrap()
3381 .try_get("count")
3382 .unwrap();
3383 let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3384 .fetch_one(&pool)
3385 .await
3386 .unwrap();
3387 let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3388 .fetch_one(&pool)
3389 .await
3390 .unwrap();
3391
3392 assert_eq!(order_count, 1);
3393 assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3394 assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3395 assert_eq!(product_name, "typed-sku");
3396 }
3397
3398 #[tokio::test(flavor = "multi_thread")]
3399 async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3400 use sqlx::sqlite::SqlitePoolOptions;
3401 use std::time::{SystemTime, UNIX_EPOCH};
3402
3403 let db_path = std::env::temp_dir().join(format!(
3404 "teaql-plan-{}-{}.db",
3405 std::process::id(),
3406 SystemTime::now()
3407 .duration_since(UNIX_EPOCH)
3408 .unwrap()
3409 .as_nanos()
3410 ));
3411 let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3412 let pool = SqlitePoolOptions::new()
3413 .max_connections(1)
3414 .connect(&db_url)
3415 .await
3416 .unwrap();
3417
3418 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3419 let order = entity();
3420 let line = line_entity();
3421 let product = product_entity();
3422 mutation_executor
3423 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3424 .await
3425 .unwrap();
3426
3427 sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3428 .execute(&pool)
3429 .await
3430 .unwrap();
3431 sqlx::query(
3432 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3433 (200, 1, 100, 301, 'line-a'),
3434 (201, 1, 100, 302, 'line-b')",
3435 )
3436 .execute(&pool)
3437 .await
3438 .unwrap();
3439 let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3440 .fetch_one(&pool)
3441 .await
3442 .unwrap();
3443 assert_eq!(seeded_lines, 2);
3444
3445 let executor = SqliteSyncExecutor::new(mutation_executor);
3446 let mut ctx = UserContext::new()
3447 .with_metadata(
3448 InMemoryMetadataStore::new()
3449 .with_entity(order)
3450 .with_entity(line)
3451 .with_entity(product),
3452 )
3453 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3454 .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3455 ctx.insert_resource(SqliteDialect);
3456 ctx.insert_resource(executor);
3457
3458 let graph = GraphNode::new("Order")
3459 .value("id", 100_u64)
3460 .value("version", 1_i64)
3461 .value("name", "existing-updated")
3462 .relation(
3463 "lines",
3464 GraphNode::new("OrderLine")
3465 .value("name", "new-line-a")
3466 .value("product_id", 401_u64),
3467 )
3468 .relation(
3469 "lines",
3470 GraphNode::new("OrderLine")
3471 .value("name", "new-line-b")
3472 .value("product_id", 402_u64),
3473 )
3474 .relation(
3475 "lines",
3476 GraphNode::new("OrderLine")
3477 .value("id", 200_u64)
3478 .value("version", 1_i64)
3479 .value("name", "line-a-updated"),
3480 )
3481 .relation(
3482 "lines",
3483 GraphNode::new("OrderLine")
3484 .value("id", 201_u64)
3485 .value("version", 1_i64)
3486 .value("name", "line-b-updated"),
3487 );
3488
3489 let plan = ctx
3490 .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3491 .unwrap();
3492 let counts = plan.grouped_counts();
3493 assert_eq!(
3494 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3495 Some(&1)
3496 );
3497 assert_eq!(
3498 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3499 Some(&2)
3500 );
3501 assert_eq!(
3502 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3503 Some(&2)
3504 );
3505
3506 let create_batch = plan
3507 .batches
3508 .iter()
3509 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3510 .unwrap();
3511 assert_eq!(create_batch.items.len(), 2);
3512 assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3513 assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3514
3515 let update_batch = plan
3516 .batches
3517 .iter()
3518 .find(|batch| {
3519 batch.entity == "OrderLine"
3520 && batch.kind == GraphMutationKind::Update
3521 && batch.update_fields == vec!["name".to_owned()]
3522 })
3523 .unwrap();
3524 assert_eq!(update_batch.items.len(), 2);
3525
3526 let repo = ctx
3527 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3528 .unwrap();
3529 let saved = repo.execute_graph_plan(plan).unwrap();
3530 let lines = saved.relations.get("lines").unwrap();
3531 assert_eq!(lines.len(), 4);
3532
3533 let generated_count: i64 = sqlx::query_scalar(
3534 "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3535 )
3536 .fetch_one(&pool)
3537 .await
3538 .unwrap();
3539 assert_eq!(generated_count, 2);
3540 let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3541 .fetch_one(&pool)
3542 .await
3543 .unwrap();
3544 assert_eq!(updated_name, "line-a-updated");
3545 pool.close().await;
3546 let _ = std::fs::remove_file(db_path);
3547 }
3548
3549 #[tokio::test(flavor = "multi_thread")]
3550 async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3551 use sqlx::{Row, sqlite::SqlitePoolOptions};
3552
3553 let pool = SqlitePoolOptions::new()
3554 .max_connections(1)
3555 .connect("sqlite::memory:")
3556 .await
3557 .unwrap();
3558
3559 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3560 let order = entity();
3561 let line = line_entity();
3562 let product = product_entity();
3563 mutation_executor
3564 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3565 .await
3566 .unwrap();
3567
3568 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3569 .execute(&pool)
3570 .await
3571 .unwrap();
3572 sqlx::query(
3573 "INSERT INTO product (id, name) VALUES
3574 (100, 'old-sku'),
3575 (101, 'removed-sku')",
3576 )
3577 .execute(&pool)
3578 .await
3579 .unwrap();
3580 sqlx::query(
3581 "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3582 (10, 1, 100, 'old-line'),
3583 (11, 1, 101, 'removed-line')",
3584 )
3585 .execute(&pool)
3586 .await
3587 .unwrap();
3588 sqlx::query("UPDATE orderline SET version = 1")
3589 .execute(&pool)
3590 .await
3591 .unwrap();
3592
3593 let executor = SqliteSyncExecutor::new(mutation_executor);
3594 let mut ctx = UserContext::new()
3595 .with_metadata(
3596 InMemoryMetadataStore::new()
3597 .with_entity(order)
3598 .with_entity(line)
3599 .with_entity(product),
3600 )
3601 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3602 ctx.insert_resource(SqliteDialect);
3603 ctx.insert_resource(executor);
3604
3605 let repo = ctx
3606 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3607 .unwrap();
3608 let graph = GraphNode::new("Order")
3609 .value("id", 1_u64)
3610 .value("version", 1_i64)
3611 .value("name", "new-root")
3612 .relation(
3613 "lines",
3614 GraphNode::new("OrderLine")
3615 .value("id", 10_u64)
3616 .value("version", 1_i64)
3617 .value("name", "new-line")
3618 .relation(
3619 "product",
3620 GraphNode::new("Product")
3621 .value("id", 100_u64)
3622 .value("name", "new-sku"),
3623 ),
3624 )
3625 .relation(
3626 "lines",
3627 GraphNode::new("OrderLine")
3628 .value("id", 12_u64)
3629 .value("version", 1_i64)
3630 .value("name", "added-line")
3631 .relation(
3632 "product",
3633 GraphNode::new("Product")
3634 .value("id", 102_u64)
3635 .value("name", "added-sku"),
3636 ),
3637 );
3638
3639 let saved = repo.save_graph(graph).unwrap();
3640 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3641
3642 let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3643 .fetch_one(&pool)
3644 .await
3645 .unwrap();
3646 assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3647 assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3648
3649 let updated_line =
3650 sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3651 .fetch_one(&pool)
3652 .await
3653 .unwrap();
3654 assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3655 assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3656 assert_eq!(
3657 updated_line.try_get::<String, _>("name").unwrap(),
3658 "new-line"
3659 );
3660
3661 let added_line =
3662 sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3663 .fetch_one(&pool)
3664 .await
3665 .unwrap();
3666 assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3667 assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3668 assert_eq!(
3669 added_line.try_get::<String, _>("name").unwrap(),
3670 "added-line"
3671 );
3672
3673 let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3674 .fetch_one(&pool)
3675 .await
3676 .unwrap();
3677 assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3678
3679 let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3680 .fetch_one(&pool)
3681 .await
3682 .unwrap();
3683 assert_eq!(
3684 updated_product.try_get::<String, _>("name").unwrap(),
3685 "new-sku"
3686 );
3687 }
3688
3689 #[tokio::test(flavor = "multi_thread")]
3690 async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3691 use sqlx::{Row, sqlite::SqlitePoolOptions};
3692
3693 let pool = SqlitePoolOptions::new()
3694 .max_connections(1)
3695 .connect("sqlite::memory:")
3696 .await
3697 .unwrap();
3698
3699 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3700 let order = sqlite_entity_keep_missing();
3701 let line = line_entity();
3702 let product = product_entity();
3703 mutation_executor
3704 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3705 .await
3706 .unwrap();
3707
3708 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3709 .execute(&pool)
3710 .await
3711 .unwrap();
3712 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3713 .execute(&pool)
3714 .await
3715 .unwrap();
3716 sqlx::query(
3717 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3718 (10, 1, 1, 100, 'remove-me'),
3719 (11, 1, 1, 100, 'keep-me')",
3720 )
3721 .execute(&pool)
3722 .await
3723 .unwrap();
3724
3725 let executor = SqliteSyncExecutor::new(mutation_executor);
3726 let mut ctx = UserContext::new()
3727 .with_metadata(
3728 InMemoryMetadataStore::new()
3729 .with_entity(order)
3730 .with_entity(line)
3731 .with_entity(product),
3732 )
3733 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3734 ctx.insert_resource(SqliteDialect);
3735 ctx.insert_resource(executor);
3736
3737 let repo = ctx
3738 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3739 .unwrap();
3740 let graph = GraphNode::new("Order")
3741 .value("id", 1_u64)
3742 .value("version", 1_i64)
3743 .value("name", "root-updated")
3744 .relation(
3745 "lines",
3746 GraphNode::new("OrderLine")
3747 .value("id", 10_u64)
3748 .value("version", 1_i64)
3749 .remove(),
3750 )
3751 .relation(
3752 "lines",
3753 GraphNode::new("OrderLine")
3754 .value("id", 12_u64)
3755 .value("version", 1_i64)
3756 .value("name", "new-reference-line")
3757 .relation(
3758 "product",
3759 GraphNode::new("Product").value("id", 100_u64).reference(),
3760 ),
3761 );
3762
3763 let saved = repo.save_graph(graph).unwrap();
3764 assert_eq!(
3765 saved.relations["lines"][1].relations["product"][0]
3766 .values
3767 .get("name"),
3768 Some(&Value::Text("reference-only".to_owned()))
3769 );
3770
3771 let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3772 .fetch_one(&pool)
3773 .await
3774 .unwrap();
3775 assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3776
3777 let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3778 .fetch_one(&pool)
3779 .await
3780 .unwrap();
3781 assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3782 assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3783
3784 let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3785 .fetch_one(&pool)
3786 .await
3787 .unwrap();
3788 assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3789
3790 let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3791 .fetch_one(&pool)
3792 .await
3793 .unwrap();
3794 assert_eq!(
3795 product.try_get::<String, _>("name").unwrap(),
3796 "reference-only"
3797 );
3798 }
3799
3800 #[tokio::test(flavor = "multi_thread")]
3801 async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3802 use sqlx::sqlite::SqlitePoolOptions;
3803
3804 let pool = SqlitePoolOptions::new()
3805 .max_connections(1)
3806 .connect("sqlite::memory:")
3807 .await
3808 .unwrap();
3809
3810 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3811 let order = entity();
3812 let line = line_entity();
3813 let product = product_entity();
3814 mutation_executor
3815 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3816 .await
3817 .unwrap();
3818
3819 sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3820 .execute(&pool)
3821 .await
3822 .unwrap();
3823 sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3824 .execute(&pool)
3825 .await
3826 .unwrap();
3827 sqlx::query(
3828 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3829 (10, 1, 1, 100, 'line-10')",
3830 )
3831 .execute(&pool)
3832 .await
3833 .unwrap();
3834
3835 let executor = SqliteSyncExecutor::new(mutation_executor);
3836 let mut ctx = UserContext::new()
3837 .with_metadata(
3838 InMemoryMetadataStore::new()
3839 .with_entity(order)
3840 .with_entity(line)
3841 .with_entity(product),
3842 )
3843 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3844 ctx.insert_resource(SqliteDialect);
3845 ctx.insert_resource(executor);
3846
3847 let repo = ctx
3848 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3849 .unwrap();
3850
3851 let missing_reference = repo.save_graph(
3852 GraphNode::new("Order").value("id", 1_u64).relation(
3853 "lines",
3854 GraphNode::new("OrderLine")
3855 .value("id", 12_u64)
3856 .value("version", 1_i64)
3857 .value("name", "bad-reference")
3858 .relation(
3859 "product",
3860 GraphNode::new("Product").value("id", 999_u64).reference(),
3861 ),
3862 ),
3863 );
3864 assert!(format!("{missing_reference:?}").contains("does not exist"));
3865
3866 let mutable_reference = repo.save_graph(
3867 GraphNode::new("Order").value("id", 1_u64).relation(
3868 "lines",
3869 GraphNode::new("OrderLine")
3870 .value("id", 12_u64)
3871 .value("version", 1_i64)
3872 .value("name", "mutable-reference")
3873 .relation(
3874 "product",
3875 GraphNode::new("Product")
3876 .value("id", 100_u64)
3877 .value("name", "should-not-mutate")
3878 .reference(),
3879 ),
3880 ),
3881 );
3882 assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3883
3884 let duplicate_child = repo.save_graph(
3885 GraphNode::new("Order")
3886 .value("id", 1_u64)
3887 .relation(
3888 "lines",
3889 GraphNode::new("OrderLine")
3890 .value("id", 10_u64)
3891 .value("version", 1_i64)
3892 .reference(),
3893 )
3894 .relation(
3895 "lines",
3896 GraphNode::new("OrderLine")
3897 .value("id", 10_u64)
3898 .value("version", 1_i64)
3899 .reference(),
3900 ),
3901 );
3902 assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3903
3904 let reference_version_conflict = repo.save_graph(
3905 GraphNode::new("Order").value("id", 1_u64).relation(
3906 "lines",
3907 GraphNode::new("OrderLine")
3908 .value("id", 10_u64)
3909 .value("version", 2_i64)
3910 .reference(),
3911 ),
3912 );
3913 assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3914
3915 let remove_with_child = repo.save_graph(
3916 GraphNode::new("Order").value("id", 1_u64).relation(
3917 "lines",
3918 GraphNode::new("OrderLine")
3919 .value("id", 10_u64)
3920 .value("version", 1_i64)
3921 .relation(
3922 "product",
3923 GraphNode::new("Product").value("id", 100_u64).reference(),
3924 )
3925 .remove(),
3926 ),
3927 );
3928 assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3929
3930 let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3931 assert!(format!("{remove:?}").contains("does not exist"));
3932 }
3933
3934 #[tokio::test(flavor = "multi_thread")]
3935 async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3936 use sqlx::{Row, sqlite::SqlitePoolOptions};
3937
3938 let pool = SqlitePoolOptions::new()
3939 .max_connections(1)
3940 .connect("sqlite::memory:")
3941 .await
3942 .unwrap();
3943
3944 let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3945 let order = entity();
3946 let line = line_entity();
3947 let product = product_entity();
3948 mutation_executor
3949 .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3950 .await
3951 .unwrap();
3952
3953 let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3954 let mut ctx = UserContext::new()
3955 .with_metadata(
3956 InMemoryMetadataStore::new()
3957 .with_entity(order)
3958 .with_entity(line)
3959 .with_entity(product),
3960 )
3961 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3962 ctx.insert_resource(SqliteDialect);
3963 ctx.insert_resource(executor);
3964
3965 let repo = ctx
3966 .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3967 .unwrap();
3968 let error = repo.save_graph(
3969 GraphNode::new("Order")
3970 .value("id", 1_u64)
3971 .value("version", 1_i64)
3972 .value("name", "rollback-root")
3973 .relation(
3974 "lines",
3975 GraphNode::new("OrderLine")
3976 .value("id", 10_u64)
3977 .value("version", 1_i64)
3978 .value("name", "rollback-line")
3979 .value("product_id", 999_u64)
3980 .relation(
3981 "product",
3982 GraphNode::new("Product").value("id", 999_u64).reference(),
3983 ),
3984 ),
3985 );
3986 assert!(format!("{error:?}").contains("does not exist"));
3987
3988 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3989 .fetch_one(&pool)
3990 .await
3991 .unwrap()
3992 .try_get("count")
3993 .unwrap();
3994 assert_eq!(count, 0);
3995 }
3996
3997 #[tokio::test]
3998 async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3999 use sqlx::{PgPool, Row};
4000
4001 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4002 return;
4003 };
4004
4005 let pool = PgPool::connect(&database_url).await.unwrap();
4006 sqlx::query("DROP TABLE IF EXISTS orderline")
4007 .execute(&pool)
4008 .await
4009 .unwrap();
4010 sqlx::query("DROP TABLE IF EXISTS orders")
4011 .execute(&pool)
4012 .await
4013 .unwrap();
4014
4015 let executor = PgMutationExecutor::new(pool.clone());
4016 let dialect = PostgresDialect;
4017 let order = entity();
4018 let line = line_entity();
4019
4020 executor
4021 .ensure_schema(&dialect, &[&order, &line])
4022 .await
4023 .unwrap();
4024
4025 let tables = sqlx::query(
4026 "SELECT table_name
4027 FROM information_schema.tables
4028 WHERE table_schema = current_schema()
4029 AND table_name IN ('orders', 'orderline')
4030 ORDER BY table_name",
4031 )
4032 .fetch_all(&pool)
4033 .await
4034 .unwrap()
4035 .into_iter()
4036 .map(|row| row.try_get::<String, _>("table_name").unwrap())
4037 .collect::<Vec<_>>();
4038 assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
4039
4040 let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
4041 .fetch_one(&pool)
4042 .await
4043 .unwrap();
4044 assert_eq!(soundex, "R163");
4045
4046 sqlx::query(
4047 "INSERT INTO orders (id, version, name) VALUES
4048 (1, 1, 'draft'),
4049 (2, 1, 'submitted'),
4050 (3, 1, 'archived')",
4051 )
4052 .execute(&pool)
4053 .await
4054 .unwrap();
4055
4056 let array_bound_query = dialect
4057 .compile_select(
4058 &order,
4059 &SelectQuery::new("Order")
4060 .filter(
4061 Expr::in_large(
4062 "id",
4063 vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4064 )
4065 .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4066 )
4067 .order_asc("id"),
4068 )
4069 .unwrap();
4070 assert_eq!(
4071 array_bound_query.sql,
4072 format!(
4073 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4074 )
4075 );
4076 let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4077 assert_eq!(rows.len(), 2);
4078 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4079 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4080
4081 sqlx::query(
4082 "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4083 (11, 1, 1, 101, 'line-1'),
4084 (12, 1, 2, 102, 'line-2'),
4085 (13, 1, 3, 103, 'archived-line')",
4086 )
4087 .execute(&pool)
4088 .await
4089 .unwrap();
4090
4091 let subquery = dialect
4092 .compile_select(
4093 &order,
4094 &SelectQuery::new("Order")
4095 .filter(Expr::in_subquery(
4096 "id",
4097 line.clone(),
4098 SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4099 "order_id",
4100 ))
4101 .order_asc("id"),
4102 )
4103 .unwrap();
4104 assert_eq!(
4105 subquery.sql,
4106 format!(
4107 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4108 )
4109 );
4110 let rows = executor.fetch_all(&subquery).await.unwrap();
4111 assert_eq!(rows.len(), 2);
4112 assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4113 assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4114
4115 let projected = dialect
4116 .compile_select(
4117 &order,
4118 &SelectQuery::new("Order")
4119 .project("id")
4120 .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4121 .order_gbk_asc("name")
4122 .limit(1),
4123 )
4124 .unwrap();
4125 assert_eq!(
4126 projected.sql,
4127 "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4128 );
4129 let rows = executor.fetch_all(&projected).await.unwrap();
4130 assert_eq!(rows.len(), 1);
4131 assert!(rows[0].contains_key("nameSound"));
4132
4133 let aggregate = dialect
4134 .compile_select(
4135 &order,
4136 &SelectQuery::new("Order")
4137 .count("total")
4138 .stddev("version", "stddevVersion")
4139 .var_pop("version", "varPopVersion")
4140 .bit_or("version", "bitOrVersion")
4141 .having(Expr::binary(
4142 Expr::count_all(),
4143 teaql_core::BinaryOp::Gt,
4144 Expr::value(2_i64),
4145 )),
4146 )
4147 .unwrap();
4148 assert_eq!(
4149 aggregate.sql,
4150 "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4151 );
4152 let rows = executor.fetch_all(&aggregate).await.unwrap();
4153 assert_eq!(rows.len(), 1);
4154 assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4155 assert!(matches!(
4156 rows[0].get("stddevVersion"),
4157 Some(Value::Decimal(_))
4158 ));
4159 assert!(matches!(
4160 rows[0].get("varPopVersion"),
4161 Some(Value::Decimal(_))
4162 ));
4163 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4164
4165 sqlx::query("DROP TABLE orderline")
4166 .execute(&pool)
4167 .await
4168 .unwrap();
4169 sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4170 .execute(&pool)
4171 .await
4172 .unwrap();
4173
4174 executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4175
4176 let columns = sqlx::query(
4177 "SELECT column_name
4178 FROM information_schema.columns
4179 WHERE table_schema = current_schema()
4180 AND table_name = 'orderline'
4181 ORDER BY column_name",
4182 )
4183 .fetch_all(&pool)
4184 .await
4185 .unwrap()
4186 .into_iter()
4187 .map(|row| row.try_get::<String, _>("column_name").unwrap())
4188 .collect::<Vec<_>>();
4189 assert!(columns.contains(&"id".to_owned()));
4190 assert!(columns.contains(&"order_id".to_owned()));
4191 assert!(columns.contains(&"product_id".to_owned()));
4192 assert!(columns.contains(&"name".to_owned()));
4193
4194 sqlx::query("DROP TABLE IF EXISTS orderline")
4195 .execute(&pool)
4196 .await
4197 .unwrap();
4198 sqlx::query("DROP TABLE IF EXISTS orders")
4199 .execute(&pool)
4200 .await
4201 .unwrap();
4202 }
4203
4204 #[tokio::test(flavor = "multi_thread")]
4205 async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4206 use sqlx::{PgPool, Row};
4207
4208 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4209 return;
4210 };
4211
4212 let pool = PgPool::connect(&database_url).await.unwrap();
4213 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4214 .execute(&pool)
4215 .await
4216 .unwrap();
4217 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4218 .execute(&pool)
4219 .await
4220 .unwrap();
4221
4222 let order = entity().table_name("orders_idgen");
4223 let executor = PgMutationExecutor::new(pool.clone());
4224 executor
4225 .ensure_schema(&PostgresDialect, &[&order])
4226 .await
4227 .unwrap();
4228
4229 let mut ctx = UserContext::new()
4230 .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4231 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4232 .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4233 ctx.insert_resource(PostgresDialect);
4234 ctx.insert_resource(PgSyncExecutor::new(executor));
4235
4236 let repo = ctx
4237 .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4238 .unwrap();
4239 repo.insert(
4240 &repo
4241 .insert_command()
4242 .value("version", 1_i64)
4243 .value("name", "generated-1"),
4244 )
4245 .unwrap();
4246 repo.insert(
4247 &repo
4248 .insert_command()
4249 .value("version", 1_i64)
4250 .value("name", "generated-2"),
4251 )
4252 .unwrap();
4253
4254 let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4255 .fetch_all(&pool)
4256 .await
4257 .unwrap()
4258 .into_iter()
4259 .map(|row| row.try_get::<i64, _>("id").unwrap())
4260 .collect::<Vec<_>>();
4261 assert_eq!(ids, vec![1, 2]);
4262
4263 let current: i64 = sqlx::query_scalar(
4264 "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4265 )
4266 .fetch_one(&pool)
4267 .await
4268 .unwrap();
4269 assert_eq!(current, 2);
4270
4271 sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4272 .execute(&pool)
4273 .await
4274 .unwrap();
4275 sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4276 .execute(&pool)
4277 .await
4278 .unwrap();
4279 }
4280
4281 #[tokio::test]
4282 async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4283 use sqlx::PgPool;
4284
4285 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4286 return;
4287 };
4288
4289 let pool = PgPool::connect(&database_url).await.unwrap();
4290 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4291 .execute(&pool)
4292 .await
4293 .unwrap();
4294
4295 let order = entity().table_name("orders_agg");
4296 let executor = PgMutationExecutor::new(pool.clone());
4297 let dialect = PostgresDialect;
4298 executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4299
4300 sqlx::query(
4301 "INSERT INTO orders_agg (id, version, name) VALUES
4302 (1, 1, 'A'),
4303 (2, 2, 'A'),
4304 (3, 3, 'B'),
4305 (4, 4, 'B')",
4306 )
4307 .execute(&pool)
4308 .await
4309 .unwrap();
4310
4311 let aggregate = dialect
4312 .compile_select(
4313 &order,
4314 &SelectQuery::new("Order")
4315 .count("total")
4316 .count_field("version", "versionCount")
4317 .sum("version", "sumVersion")
4318 .avg("version", "avgVersion")
4319 .min("version", "minVersion")
4320 .max("version", "maxVersion")
4321 .stddev("version", "stddevVersion")
4322 .stddev_pop("version", "stddevPopVersion")
4323 .var_samp("version", "varSampVersion")
4324 .var_pop("version", "varPopVersion")
4325 .bit_and("version", "bitAndVersion")
4326 .bit_or("version", "bitOrVersion")
4327 .bit_xor("version", "bitXorVersion")
4328 .having(Expr::binary(
4329 Expr::count_all(),
4330 teaql_core::BinaryOp::Gt,
4331 Expr::value(3_i64),
4332 )),
4333 )
4334 .unwrap();
4335 let rows = executor.fetch_all(&aggregate).await.unwrap();
4336 assert_eq!(rows.len(), 1);
4337 let row = &rows[0];
4338 assert_eq!(row.get("total"), Some(&Value::I64(4)));
4339 assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4340 assert_eq!(
4341 row.get("sumVersion"),
4342 Some(&Value::Decimal(Decimal::new(10, 0)))
4343 );
4344 assert_eq!(
4345 row.get("avgVersion"),
4346 Some(&Value::Decimal(Decimal::new(25, 1)))
4347 );
4348 assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4349 assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4350 assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4351 assert!(matches!(
4352 row.get("stddevPopVersion"),
4353 Some(Value::Decimal(_))
4354 ));
4355 assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4356 assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4357 assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4358 assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4359 assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4360
4361 let grouped = dialect
4362 .compile_select(
4363 &order,
4364 &SelectQuery::new("Order")
4365 .group_by("name")
4366 .count("total")
4367 .sum("version", "sumVersion")
4368 .having(Expr::binary(
4369 Expr::count_all(),
4370 teaql_core::BinaryOp::Gt,
4371 Expr::value(1_i64),
4372 ))
4373 .order_asc("name"),
4374 )
4375 .unwrap();
4376 let rows = executor.fetch_all(&grouped).await.unwrap();
4377 assert_eq!(rows.len(), 2);
4378 assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4379 assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4380 assert_eq!(
4381 rows[0].get("sumVersion"),
4382 Some(&Value::Decimal(Decimal::new(3, 0)))
4383 );
4384 assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4385 assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4386 assert_eq!(
4387 rows[1].get("sumVersion"),
4388 Some(&Value::Decimal(Decimal::new(7, 0)))
4389 );
4390
4391 sqlx::query("DROP TABLE IF EXISTS orders_agg")
4392 .execute(&pool)
4393 .await
4394 .unwrap();
4395 }
4396
4397 #[tokio::test]
4398 async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4399 use sqlx::{PgPool, Row};
4400
4401 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4402 return;
4403 };
4404
4405 let pool = PgPool::connect(&database_url).await.unwrap();
4406 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4407 .execute(&pool)
4408 .await
4409 .unwrap();
4410 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4411 .execute(&pool)
4412 .await
4413 .unwrap();
4414 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4415 .execute(&pool)
4416 .await
4417 .unwrap();
4418
4419 let order = entity().table_name("orders_ctx");
4420 let line = line_entity().table_name("orderline_ctx");
4421 let product = product_entity().table_name("product_ctx");
4422 let module = super::RuntimeModule::new()
4423 .descriptor(order)
4424 .descriptor(line)
4425 .descriptor(product);
4426 let mut ctx = super::UserContext::new().with_module(module);
4427 ctx.insert_resource(PostgresDialect);
4428 ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4429
4430 ctx.ensure_postgres_schema().await.unwrap();
4431
4432 let tables = sqlx::query(
4433 "SELECT table_name
4434 FROM information_schema.tables
4435 WHERE table_schema = current_schema()
4436 AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4437 ORDER BY table_name",
4438 )
4439 .fetch_all(&pool)
4440 .await
4441 .unwrap()
4442 .into_iter()
4443 .map(|row| row.try_get::<String, _>("table_name").unwrap())
4444 .collect::<Vec<_>>();
4445
4446 assert_eq!(
4447 tables,
4448 vec![
4449 "orderline_ctx".to_owned(),
4450 "orders_ctx".to_owned(),
4451 "product_ctx".to_owned()
4452 ]
4453 );
4454
4455 sqlx::query("DROP TABLE IF EXISTS product_ctx")
4456 .execute(&pool)
4457 .await
4458 .unwrap();
4459 sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4460 .execute(&pool)
4461 .await
4462 .unwrap();
4463 sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4464 .execute(&pool)
4465 .await
4466 .unwrap();
4467 }
4468
4469 #[tokio::test(flavor = "multi_thread")]
4470 async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4471 use sqlx::{PgPool, Row};
4472
4473 let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4474 return;
4475 };
4476
4477 let pool = PgPool::connect(&database_url).await.unwrap();
4478 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4479 .execute(&pool)
4480 .await
4481 .unwrap();
4482 sqlx::query("DROP TABLE IF EXISTS product_tx")
4483 .execute(&pool)
4484 .await
4485 .unwrap();
4486 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4487 .execute(&pool)
4488 .await
4489 .unwrap();
4490
4491 let order = entity().table_name("orders_tx");
4492 let line = line_entity().table_name("orderline_tx");
4493 let product = product_entity().table_name("product_tx");
4494 let schema_executor = PgMutationExecutor::new(pool.clone());
4495 schema_executor
4496 .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4497 .await
4498 .unwrap();
4499
4500 let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4501 let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4502 let mut ctx = UserContext::new()
4503 .with_metadata(
4504 InMemoryMetadataStore::new()
4505 .with_entity(order)
4506 .with_entity(line)
4507 .with_entity(product),
4508 )
4509 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4510 ctx.insert_resource(PostgresDialect);
4511 ctx.insert_resource(sync_executor);
4512
4513 let repo = ctx
4514 .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4515 .unwrap();
4516 repo.save_graph(
4517 GraphNode::new("Order")
4518 .value("id", 1_u64)
4519 .value("version", 1_i64)
4520 .value("name", "pg-rollback-root")
4521 .relation(
4522 "lines",
4523 GraphNode::new("OrderLine")
4524 .value("id", 10_u64)
4525 .value("version", 1_i64)
4526 .value("name", "pg-rollback-line")
4527 .relation(
4528 "product",
4529 GraphNode::new("Product")
4530 .value("id", 100_u64)
4531 .value("name", "pg-rollback-sku"),
4532 ),
4533 ),
4534 )
4535 .unwrap();
4536
4537 tx_executor.rollback().await.unwrap();
4538
4539 let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4540 .fetch_one(&pool)
4541 .await
4542 .unwrap()
4543 .try_get("count")
4544 .unwrap();
4545 assert_eq!(count, 0);
4546
4547 sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4548 .execute(&pool)
4549 .await
4550 .unwrap();
4551 sqlx::query("DROP TABLE IF EXISTS product_tx")
4552 .execute(&pool)
4553 .await
4554 .unwrap();
4555 sqlx::query("DROP TABLE IF EXISTS orders_tx")
4556 .execute(&pool)
4557 .await
4558 .unwrap();
4559 }
4560}
4561pub use checker::{
4562 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4563 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4564 mark_record_status,
4565};